http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala new file mode 100644 index 0000000..5b0dfbd --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala @@ -0,0 +1,528 @@ +package com.kakao.s2graph.core.Integrate + +import org.scalatest.BeforeAndAfterEach +import play.api.libs.json._ + +class QueryTest extends IntegrateCommon with BeforeAndAfterEach { + + import TestUtil._ + + val insert = "insert" + val e = "e" + val weight = "weight" + val is_hidden = "is_hidden" + + test("interval") { + def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "index": "$index", + "interval": { + "from": [ { "$prop": $fromVal } ], + "to": [ { "$prop": $toVal } ] + } + } + ]] + } + """) + + var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index + (edges \ "size").toString should be("1") + + edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index + (edges \ "size").toString should be("2") + + edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index + (edges \ "size").toString should be("1") + + edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index + (edges \ "size").toString should be("2") + } + + test("get edge with where condition") { + def queryWhere(id: Int, where: String) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": 0, + "limit": 100, + "where": "${where}" + } + ]] + }""") + + var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 0)")) + (result \ "results").as[List[JsValue]].size should be(1) + + result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)")) + (result \ "results").as[List[JsValue]].size should be(1) + + result = getEdgesSync(queryWhere(0, "_from=0")) + (result \ "results").as[List[JsValue]].size should be(2) + + result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)")) + (result \ "results").as[List[JsValue]].size should be(2) + + result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)")) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("get edge exclude") { + def queryExclude(id: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": 0, + "limit": 2 + }, + { + "label": "${testLabelName}", + "direction": "in", + "offset": 0, + "limit": 2, + "exclude": true + } + ]] + }""") + + val result = getEdgesSync(queryExclude(0)) + (result \ "results").as[List[JsValue]].size should be(1) + } + + test("get edge groupBy property") { + def queryGroupBy(id: Int, props: Seq[String]): JsValue = { + Json.obj( + "groupBy" -> props, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName + ) + ) + ) + ) + ) + } + + val result = getEdgesSync(queryGroupBy(0, Seq("weight"))) + (result \ "size").as[Int] should be(2) + val weights = (result \\ "groupBy").map { js => + (js \ "weight").as[Int] + } + weights should contain(30) + weights should contain(40) + + weights should not contain (10) + } + + test("edge transform") { + def queryTransform(id: Int, transforms: String) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": 0, + "transform": $transforms + } + ]] + }""") + + var result = getEdgesSync(queryTransform(0, "[[\"_to\"]]")) + (result \ "results").as[List[JsValue]].size should be(2) + + result = getEdgesSync(queryTransform(0, "[[\"weight\"]]")) + (result \\ "to").map(_.toString).sorted should be((result \\ "weight").map(_.toString).sorted) + + result = getEdgesSync(queryTransform(0, "[[\"_from\"]]")) + val results = (result \ "results").as[JsValue] + (result \\ "to").map(_.toString).sorted should be((results \\ "from").map(_.toString).sorted) + } + + test("index") { + def queryIndex(ids: Seq[Int], indexName: String) = { + val $from = Json.arr( + Json.obj("serviceName" -> testServiceName, + "columnName" -> testColumnName, + "ids" -> ids)) + + val $step = Json.arr(Json.obj("label" -> testLabelName, "index" -> indexName)) + val $steps = Json.arr(Json.obj("step" -> $step)) + + val js = Json.obj("withScore" -> false, "srcVertices" -> $from, "steps" -> $steps) + js + } + + // weight order + var result = getEdgesSync(queryIndex(Seq(0), "idx_1")) + ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(40)) + + // timestamp order + result = getEdgesSync(queryIndex(Seq(0), "idx_2")) + ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(30)) + } + + // "checkEdges" in { + // running(FakeApplication()) { + // val json = Json.parse( s""" + // [{"from": 0, "to": 1, "label": "$testLabelName"}, + // {"from": 0, "to": 2, "label": "$testLabelName"}] + // """) + // + // def checkEdges(queryJson: JsValue): JsValue = { + // val ret = route(FakeRequest(POST, "/graphs/checkEdges").withJsonBody(queryJson)).get + // contentAsJson(ret) + // } + // + // val res = checkEdges(json) + // val typeRes = res.isInstanceOf[JsArray] + // typeRes must equalTo(true) + // + // val fst = res.as[Seq[JsValue]].head \ "to" + // fst.as[Int] must equalTo(1) + // + // val snd = res.as[Seq[JsValue]].last \ "to" + // snd.as[Int] must equalTo(2) + // } + // } + + test("return tree") { + def queryParents(id: Long) = Json.parse( + s""" + { + "returnTree": true, + "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 2 + } + ],[{ + "label": "$testLabelName", + "direction": "in", + "offset": 0, + "limit": -1 + } + ]] + }""".stripMargin) + + val src = 100 + val tgt = 200 + + insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) + + val result = TestUtil.getEdgesSync(queryParents(src)) + val parents = (result \ "results").as[Seq[JsValue]] + val ret = parents.forall { + edge => (edge \ "parents").as[Seq[JsValue]].size == 1 + } + + ret should be(true) + } + + + + test("pagination and _to") { + def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": $offset, + "limit": $limit, + "_to": $to + } + ]] + } + """) + + val src = System.currentTimeMillis().toInt + + val bulkEdges = Seq( + toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), + toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), + toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), + toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) + ) + insertEdgesSync(bulkEdges: _*) + + var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) + var edges = (result \ "results").as[List[JsValue]] + + edges.size should be(2) + (edges(0) \ "to").as[Long] should be(4) + (edges(1) \ "to").as[Long] should be(3) + + result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) + + edges = (result \ "results").as[List[JsValue]] + edges.size should be(2) + (edges(0) \ "to").as[Long] should be(3) + (edges(1) \ "to").as[Long] should be(2) + + result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) + edges = (result \ "results").as[List[JsValue]] + edges.size should be(1) + } + test("order by") { + def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj( + "srcVertices" -> Json.arr( + Json.obj( + "serviceName" -> testServiceName, + "columnName" -> testColumnName, + "id" -> id + ) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName, + "scoring" -> scoring + ) + ) + ) + ) + ) + def queryOrderBy(id: Int, scoring: Map[String, Int], props: Seq[Map[String, String]]): JsValue = Json.obj( + "orderBy" -> props, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName, + "scoring" -> scoring + ) + ) + ) + ) + ) + + val bulkEdges = Seq( + toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), + toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), + toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), + toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) + ) + + insertEdgesSync(bulkEdges: _*) + + // get edges + val edges = getEdgesSync(queryScore(0, Map("weight" -> 1))) + val orderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC")))) + val ascOrderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC")))) + + val edgesTo = edges \ "results" \\ "to" + val orderByTo = orderByScore \ "results" \\ "to" + val ascOrderByTo = ascOrderByScore \ "results" \\ "to" + + edgesTo should be(Seq(JsNumber(2), JsNumber(1))) + edgesTo should be(orderByTo) + ascOrderByTo should be(Seq(JsNumber(1), JsNumber(2))) + edgesTo.reverse should be(ascOrderByTo) + } + + test("query with sampling") { + def queryWithSampling(id: Int, sample: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 100, + "sample": $sample + }] + } + ] + }""") + + def twoStepQueryWithSampling(id: Int, sample: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 100, + "sample": $sample + }] + }, + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 100, + "sample": $sample + }] + } + ] + }""") + + def twoQueryWithSampling(id: Int, sample: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + { + "step": [{ + "label": "$testLabelName", + "direction": "out", + "offset": 0, + "limit": 50, + "sample": $sample + }, + { + "label": "$testLabelName2", + "direction": "out", + "offset": 0, + "limit": 50 + }] + } + ] + }""") + + val sampleSize = 2 + val ts = "1442985659166" + val testId = 22 + + val bulkEdges = Seq( + toEdge(ts, insert, e, testId, 122, testLabelName), + toEdge(ts, insert, e, testId, 222, testLabelName), + toEdge(ts, insert, e, testId, 322, testLabelName), + + toEdge(ts, insert, e, testId, 922, testLabelName2), + toEdge(ts, insert, e, testId, 222, testLabelName2), + toEdge(ts, insert, e, testId, 322, testLabelName2), + + toEdge(ts, insert, e, 122, 1122, testLabelName), + toEdge(ts, insert, e, 122, 1222, testLabelName), + toEdge(ts, insert, e, 122, 1322, testLabelName), + toEdge(ts, insert, e, 222, 2122, testLabelName), + toEdge(ts, insert, e, 222, 2222, testLabelName), + toEdge(ts, insert, e, 222, 2322, testLabelName), + toEdge(ts, insert, e, 322, 3122, testLabelName), + toEdge(ts, insert, e, 322, 3222, testLabelName), + toEdge(ts, insert, e, 322, 3322, testLabelName) + ) + + insertEdgesSync(bulkEdges: _*) + + val result1 = getEdgesSync(queryWithSampling(testId, sampleSize)) + (result1 \ "results").as[List[JsValue]].size should be(math.min(sampleSize, bulkEdges.size)) + + val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize)) + (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) + + val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize)) + (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3 + } + + def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$testColumnName", + "id": $id + }], + "steps": [ + [ { + "label": "$testLabelName", + "direction": "out", + "offset": $offset, + "limit": $limit + } + ]] + } + """) + + // called by each test, each + override def beforeEach = initTestData() + + // called by start test, once + override def initTestData(): Unit = { + super.initTestData() + + insertEdgesSync( + toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, is_hidden -> true)), + toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, is_hidden -> false)), + toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)), + toEdge(4000, insert, e, 2, 1, testLabelName, Json.obj(weight -> 10)), + toEdge(3000, insert, e, 10, 20, testLabelName, Json.obj(weight -> 20)), + toEdge(4000, insert, e, 20, 20, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, -1, 1000, testLabelName), + toEdge(1, insert, e, -1, 2000, testLabelName), + toEdge(1, insert, e, -1, 3000, testLabelName), + toEdge(1, insert, e, 1000, 10000, testLabelName), + toEdge(1, insert, e, 1000, 11000, testLabelName), + toEdge(1, insert, e, 2000, 11000, testLabelName), + toEdge(1, insert, e, 2000, 12000, testLabelName), + toEdge(1, insert, e, 3000, 12000, testLabelName), + toEdge(1, insert, e, 3000, 13000, testLabelName), + toEdge(1, insert, e, 10000, 100000, testLabelName), + toEdge(2, insert, e, 11000, 200000, testLabelName), + toEdge(3, insert, e, 12000, 300000, testLabelName) + ) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala new file mode 100644 index 0000000..69a49b4 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala @@ -0,0 +1,282 @@ +package com.kakao.s2graph.core.Integrate + +import java.util.concurrent.TimeUnit + +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.Random + +class StrongLabelDeleteTest extends IntegrateCommon { + + import StrongDeleteUtil._ + import TestUtil._ + + test("Strong consistency select") { + insertEdgesSync(bulkEdges(): _*) + + var result = getEdgesSync(query(0)) + (result \ "results").as[List[JsValue]].size should be(2) + result = getEdgesSync(query(10)) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("Strong consistency deleteAll") { + val deletedAt = 100 + var result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + + println(result) + (result \ "results").as[List[JsValue]].size should be(3) + + val deleteParam = Json.arr( + Json.obj("label" -> testLabelName2, + "direction" -> "in", + "ids" -> Json.arr("20"), + "timestamp" -> deletedAt)) + + deleteAllSync(deleteParam) + + result = getEdgesSync(query(11, direction = "out")) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(12, direction = "out")) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(10, direction = "out")) + println(result) + // 10 -> out -> 20 should not be in result. + (result \ "results").as[List[JsValue]].size should be(1) + (result \\ "to").size should be(1) + (result \\ "to").head.as[String] should be("21") + + result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + + result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) + println(result) + + (result \ "results").as[List[JsValue]].size should be(3) + } + + + test("update delete") { + val ret = for { + i <- 0 until testNum + } yield { + val src = System.currentTimeMillis() + + val (ret, last) = testInner(i, src) + ret should be(true) + ret + } + + ret.forall(identity) + } + + test("update delete 2") { + val src = System.currentTimeMillis() + var ts = 0L + + val ret = for { + i <- 0 until testNum + } yield { + val (ret, lastTs) = testInner(ts, src) + val deletedAt = lastTs + 1 + val deletedAt2 = lastTs + 2 + ts = deletedAt2 + 1 // nex start ts + + ret should be(true) + + val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) + val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2)) + + val deleteRet = deleteAllSync(deleteAllRequest) + val deleteRet2 = deleteAllSync(deleteAllRequest2) + + val result = getEdgesSync(query(id = src)) + println(result) + + val resultEdges = (result \ "results").as[Seq[JsValue]] + resultEdges.isEmpty should be(true) + + val degreeAfterDeleteAll = getDegree(result) + + degreeAfterDeleteAll should be(0) + degreeAfterDeleteAll === (0) + } + + ret.forall(identity) + } + + /** This test stress out test on degree + * when contention is low but number of adjacent edges are large + * Large set of contention test + */ + test("large degrees") { + val labelName = testLabelName2 + val dir = "out" + val maxSize = 100 + val deleteSize = 10 + val numOfConcurrentBatch = 100 + val src = System.currentTimeMillis() + val tgts = (0 until maxSize).map { ith => src + ith } + val deleteTgts = Random.shuffle(tgts).take(deleteSize) + val insertRequests = tgts.map { tgt => + Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val deleteRequests = deleteTgts.take(deleteSize).map { tgt => + Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val allRequests = Random.shuffle(insertRequests ++ deleteRequests) + // val allRequests = insertRequests ++ deleteRequests + val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val expectedDegree = insertRequests.size - deleteRequests.size + val queryJson = query(id = src) + val result = getEdgesSync(queryJson) + val resultSize = (result \ "size").as[Long] + val resultDegree = getDegree(result) + + // println(result) + + val ret = resultSize == expectedDegree && resultDegree == resultSize + println(s"[MaxSize]: $maxSize") + println(s"[DeleteSize]: $deleteSize") + println(s"[ResultDegree]: $resultDegree") + println(s"[ExpectedDegree]: $expectedDegree") + println(s"[ResultSize]: $resultSize") + ret should be(true) + } + + test("deleteAll") { + val labelName = testLabelName2 + val dir = "out" + val maxSize = 100 + val deleteSize = 10 + val numOfConcurrentBatch = 100 + val src = System.currentTimeMillis() + val tgts = (0 until maxSize).map { ith => src + ith } + val deleteTgts = Random.shuffle(tgts).take(deleteSize) + val insertRequests = tgts.map { tgt => + Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val deleteRequests = deleteTgts.take(deleteSize).map { tgt => + Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t") + } + val allRequests = Random.shuffle(insertRequests ++ deleteRequests) + val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val deletedAt = System.currentTimeMillis() + val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt)) + + deleteAllSync(deleteAllRequest) + + val result = getEdgesSync(query(id = src)) + println(result) + val resultEdges = (result \ "results").as[Seq[JsValue]] + resultEdges.isEmpty should be(true) + + val degreeAfterDeleteAll = getDegree(result) + degreeAfterDeleteAll should be(0) + } + + object StrongDeleteUtil { + + val labelName = testLabelName2 + val maxTgtId = 10 + val batchSize = 10 + val testNum = 3 + val numOfBatch = 10 + + def testInner(startTs: Long, src: Long) = { + val labelName = testLabelName2 + val lastOps = Array.fill(maxTgtId)("none") + var currentTs = startTs + + val allRequests = for { + ith <- 0 until numOfBatch + jth <- 0 until batchSize + } yield { + currentTs += 1 + + val tgt = Random.nextInt(maxTgtId) + val op = if (Random.nextDouble() < 0.5) "delete" else "update" + + lastOps(tgt) = op + Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t") + } + + allRequests.foreach(println(_)) + + val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequests => + insertEdgesAsync(bulkRequests: _*) + } + + Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) + + val expectedDegree = lastOps.count(op => op != "delete" && op != "none") + val queryJson = query(id = src) + val result = getEdgesSync(queryJson) + val resultSize = (result \ "size").as[Long] + val resultDegree = getDegree(result) + + println(lastOps.toList) + println(result) + + val ret = resultDegree == expectedDegree && resultSize == resultDegree + if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree") + + (ret, currentTs) + } + + def bulkEdges(startTs: Int = 0) = Seq( + toEdge(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""") + ) + + def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName, + labelName: String = testLabelName2, direction: String = "out") = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id + }], + "steps": [ + [ { + "label": "$labelName", + "direction": "${direction}", + "offset": 0, + "limit": -1, + "duplicate": "raw" + } + ]] + }""") + + def getDegree(jsValue: JsValue): Long = { + ((jsValue \ "degrees") \\ "_degree").headOption.map(_.as[Long]).getOrElse(0L) + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala new file mode 100644 index 0000000..ffbec11 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/VertexTestHelper.scala @@ -0,0 +1,71 @@ +package com.kakao.s2graph.core.Integrate + +import com.kakao.s2graph.core.PostProcess +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.Await +import scala.util.Random + + +class VertexTestHelper extends IntegrateCommon { + + import TestUtil._ + import VertexTestHelper._ + + test("vertex") { + val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0) + val (serviceName, columnName) = (testServiceName, testColumnName) + + val data = vertexInsertsPayload(serviceName, columnName, ids) + val payload = Json.parse(Json.toJson(data).toString) + println(payload) + + val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName)) + Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime) + + val res = graph.getVertices(vertices).map { vertices => + PostProcess.verticesToJson(vertices) + } + + val ret = Await.result(res, HttpRequestWaitingTime) + val fetched = ret.as[Seq[JsValue]] + for { + (d, f) <- data.zip(fetched) + } yield { + (d \ "id") should be(f \ "id") + ((d \ "props") \ "age") should be((f \ "props") \ "age") + } + } + + object VertexTestHelper { + def vertexQueryJson(serviceName: String, columnName: String, ids: Seq[Int]) = { + Json.parse( + s""" + |[ + |{"serviceName": "$serviceName", "columnName": "$columnName", "ids": [${ids.mkString(",")} + ]} + |] + """.stripMargin) + } + + def vertexInsertsPayload(serviceName: String, columnName: String, ids: Seq[Int]): Seq[JsValue] = { + ids.map { id => + Json.obj("id" -> id, "props" -> randomProps, "timestamp" -> System.currentTimeMillis()) + } + } + + val vertexPropsKeys = List( + ("age", "int") + ) + + def randomProps() = { + (for { + (propKey, propType) <- vertexPropsKeys + } yield { + propKey -> Random.nextInt(100) + }).toMap + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala new file mode 100644 index 0000000..b80d9c7 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -0,0 +1,129 @@ +package com.kakao.s2graph.core.Integrate + +import java.util.concurrent.TimeUnit + +import org.scalatest.BeforeAndAfterEach +import play.api.libs.json.{JsObject, JsValue, Json} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { + + import TestUtil._ + import WeakLabelDeleteHelper._ + + test("test weak consistency select") { + var result = getEdgesSync(query(0)) + println(result) + (result \ "results").as[List[JsValue]].size should be(4) + result = getEdgesSync(query(10)) + println(result) + (result \ "results").as[List[JsValue]].size should be(2) + } + + test("test weak consistency delete") { + var result = getEdgesSync(query(0)) + println(result) + + /** expect 4 edges */ + (result \ "results").as[List[JsValue]].size should be(4) + val edges = (result \ "results").as[List[JsObject]] + val edgesToStore = parser.toEdges(Json.toJson(edges), "delete") + val rets = graph.mutateEdges(edgesToStore, withWait = true) + Await.result(rets, Duration(20, TimeUnit.MINUTES)) + + /** expect noting */ + result = getEdgesSync(query(0)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + /** insert should be ignored */ + val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert") + val rets2 = graph.mutateEdges(edgesToStore2, withWait = true) + Await.result(rets2, Duration(20, TimeUnit.MINUTES)) + + result = getEdgesSync(query(0)) + (result \ "results").as[List[JsValue]].size should be(0) + } + + + test("test weak consistency deleteAll") { + val deletedAt = 100 + var result = getEdgesSync(query(20, "in", testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(3) + + val json = Json.arr(Json.obj("label" -> testLabelNameWeak, + "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt)) + println(json) + deleteAllSync(json) + + result = getEdgesSync(query(11, "out")) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(12, "out")) + (result \ "results").as[List[JsValue]].size should be(0) + + result = getEdgesSync(query(10, "out")) + + // 10 -> out -> 20 should not be in result. + (result \ "results").as[List[JsValue]].size should be(1) + (result \\ "to").size should be(1) + (result \\ "to").head.as[String] should be("21") + + result = getEdgesSync(query(20, "in", testTgtColumnName)) + println(result) + (result \ "results").as[List[JsValue]].size should be(0) + + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + + result = getEdgesSync(query(20, "in", testTgtColumnName)) + (result \ "results").as[List[JsValue]].size should be(3) + } + + + // called by each test, each + override def beforeEach = initTestData() + + // called by start test, once + + override def initTestData(): Unit = { + super.initTestData() + insertEdgesSync(bulkEdges(): _*) + } + + object WeakLabelDeleteHelper { + + def bulkEdges(startTs: Int = 0) = Seq( + toEdge(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}"""), + toEdge(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}"""), + toEdge(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}"""), + toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}"""), + toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}"""), + toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""") + ) + + def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "$testServiceName", + "columnName": "$columnName", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelNameWeak}", + "direction": "${direction}", + "offset": 0, + "limit": 10, + "duplicate": "raw" + } + ]] + }""") + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/JsonParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/JsonParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/JsonParserTest.scala index 84349cb..127f2fe 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/JsonParserTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/JsonParserTest.scala @@ -3,9 +3,6 @@ package com.kakao.s2graph.core import com.kakao.s2graph.core.types.{InnerValLike, InnerVal} import org.scalatest.{Matchers, FunSuite} -/** - * Created by shon on 5/30/15. - */ class JsonParserTest extends FunSuite with Matchers with TestCommon with JSONParser { import types.HBaseType._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/OrderingUtilTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/OrderingUtilTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/OrderingUtilTest.scala index f6f1520..61818fc 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/OrderingUtilTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/OrderingUtilTest.scala @@ -1,12 +1,9 @@ package com.kakao.s2graph.core -import com.kakao.s2graph.core.OrderingUtil.MultiValueOrdering +import com.kakao.s2graph.core.OrderingUtil._ import org.scalatest.{FunSuite, Matchers} import play.api.libs.json.JsString -/** - * Created by hsleep([email protected]) on 2015. 11. 5.. - */ class OrderingUtilTest extends FunSuite with Matchers { test("test SeqMultiOrdering") { val jsLs: Seq[Seq[Any]] = Seq( @@ -50,7 +47,7 @@ class OrderingUtilTest extends FunSuite with Matchers { ) val ascendingLs: Seq[Boolean] = Seq(false) - val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs)) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) resultJsLs.toString() should equal(sortedJsLs.toString()) } @@ -74,7 +71,7 @@ class OrderingUtilTest extends FunSuite with Matchers { ) val ascendingLs: Seq[Boolean] = Seq(false, true) - val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs)) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) resultJsLs.toString() should equal(sortedJsLs.toString()) } @@ -99,7 +96,7 @@ class OrderingUtilTest extends FunSuite with Matchers { ) val ascendingLs: Seq[Boolean] = Seq(true, true, false) - val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs)) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) resultJsLs.toString() should equal(sortedJsLs.toString()) } @@ -126,7 +123,7 @@ class OrderingUtilTest extends FunSuite with Matchers { ) val ascendingLs: Seq[Boolean] = Seq(true, true, true, false) - val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs)) + val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs)) resultJsLs.toString() should equal(sortedJsLs.toString()) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/TestCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommon.scala index fae3d06..ed9aaa5 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommon.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommon.scala @@ -1,6 +1,6 @@ package com.kakao.s2graph.core - import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.mysqls._ //import com.kakao.s2graph.core.models._ @@ -9,13 +9,7 @@ import org.apache.hadoop.hbase.util.Bytes import org.hbase.async.{PutRequest, KeyValue} -/** - * Created by shon on 6/1/15. - */ trait TestCommon { - - - val ts = System.currentTimeMillis() val testServiceId = 1 val testColumnId = 1 @@ -104,95 +98,95 @@ trait TestCommon { val idxPropsWithTsLsV2 = idxPropsLsV2.map { idxProps => idxProps.map { case (k, v) => k -> InnerValLikeWithTs(v, ts) } } -// -// def testOrder(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], -// innerVals: Iterable[InnerValLike], skipHashBytes: Boolean = false) -// (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable, -// fromBytesFunc: Array[Byte] => HBaseSerializable) = { -// /** check if increasing target vertex id is ordered properly with same indexProps */ -// val rets = for { -// idxProps <- idxPropsLs -// } yield { -// val head = createFunc(idxProps, innerVals.head) -// val start = head -// var prev = head -// val rets = for { -// innerVal <- innerVals.tail -// } yield { -// val current = createFunc(idxProps, innerVal) -// val bytes = current.bytes -// val decoded = fromBytesFunc(bytes) -// println(s"current: $current") -// println(s"decoded: $decoded") -// -// val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes -// val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes -// val (isSame, orderPreserved) = (current, decoded) match { -// case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if (idxProps.map(_._1).contains(toSeq)) => -// /** _to is used in indexProps */ -// (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0) -// case _ => -// (current == decoded, lessThan(currentBytes, prevBytes)) -// } -// -// println(s"$current ${bytes.toList}") -// println(s"$prev ${prev.bytes.toList}") -// println(s"SerDe[$isSame], Order[$orderPreserved]") -// prev = current -// isSame && orderPreserved -// } -// rets.forall(x => x) -// } -// rets.forall(x => x) -// } -// def testOrderReverse(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], innerVals: Iterable[InnerValLike], -// skipHashBytes: Boolean = false) -// (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable, -// fromBytesFunc: Array[Byte] => HBaseSerializable) = { -// /** check if increasing target vertex id is ordered properly with same indexProps */ -// val rets = for { -// innerVal <- innerVals -// } yield { -// val head = createFunc(idxPropsLs.head, innerVal) -// val start = head -// var prev = head -// val rets = for { -// idxProps <- idxPropsLs.tail -// } yield { -// val current = createFunc(idxProps, innerVal) -// val bytes = current.bytes -// val decoded = fromBytesFunc(bytes) -// println(s"current: $current") -// println(s"decoded: $decoded") -// -// val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes -// val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes -// val (isSame, orderPreserved) = (current, decoded) match { -// case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if (idxProps.map(_._1).contains(toSeq)) => -// /** _to is used in indexProps */ -// (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0) -// case _ => -// (current == decoded, lessThan(currentBytes, prevBytes)) -// } -// -// println(s"$current ${bytes.toList}") -// println(s"$prev ${prev.bytes.toList}") -// println(s"SerDe[$isSame], Order[$orderPreserved]") -// prev = current -// isSame && orderPreserved -// } -// -// rets.forall(x => x) -// } -// -// rets.forall(x => x) -// } -// -// -// def putToKeyValues(put: PutRequest) = { -// val ts = put.timestamp() -// for ((q, v) <- put.qualifiers().zip(put.values)) yield { -// new KeyValue(put.key(), put.family(), q, ts, v) -// } -// } + // + // def testOrder(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], + // innerVals: Iterable[InnerValLike], skipHashBytes: Boolean = false) + // (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable, + // fromBytesFunc: Array[Byte] => HBaseSerializable) = { + // /** check if increasing target vertex id is ordered properly with same indexProps */ + // val rets = for { + // idxProps <- idxPropsLs + // } yield { + // val head = createFunc(idxProps, innerVals.head) + // val start = head + // var prev = head + // val rets = for { + // innerVal <- innerVals.tail + // } yield { + // val current = createFunc(idxProps, innerVal) + // val bytes = current.bytes + // val decoded = fromBytesFunc(bytes) + // println(s"current: $current") + // println(s"decoded: $decoded") + // + // val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes + // val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes + // val (isSame, orderPreserved) = (current, decoded) match { + // case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if (idxProps.map(_._1).contains(toSeq)) => + // /** _to is used in indexProps */ + // (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0) + // case _ => + // (current == decoded, lessThan(currentBytes, prevBytes)) + // } + // + // println(s"$current ${bytes.toList}") + // println(s"$prev ${prev.bytes.toList}") + // println(s"SerDe[$isSame], Order[$orderPreserved]") + // prev = current + // isSame && orderPreserved + // } + // rets.forall(x => x) + // } + // rets.forall(x => x) + // } + // def testOrderReverse(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], innerVals: Iterable[InnerValLike], + // skipHashBytes: Boolean = false) + // (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable, + // fromBytesFunc: Array[Byte] => HBaseSerializable) = { + // /** check if increasing target vertex id is ordered properly with same indexProps */ + // val rets = for { + // innerVal <- innerVals + // } yield { + // val head = createFunc(idxPropsLs.head, innerVal) + // val start = head + // var prev = head + // val rets = for { + // idxProps <- idxPropsLs.tail + // } yield { + // val current = createFunc(idxProps, innerVal) + // val bytes = current.bytes + // val decoded = fromBytesFunc(bytes) + // println(s"current: $current") + // println(s"decoded: $decoded") + // + // val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes + // val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes + // val (isSame, orderPreserved) = (current, decoded) match { + // case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if (idxProps.map(_._1).contains(toSeq)) => + // /** _to is used in indexProps */ + // (c.props.map(_._2) == d.props.map(_._2) && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0) + // case _ => + // (current == decoded, lessThan(currentBytes, prevBytes)) + // } + // + // println(s"$current ${bytes.toList}") + // println(s"$prev ${prev.bytes.toList}") + // println(s"SerDe[$isSame], Order[$orderPreserved]") + // prev = current + // isSame && orderPreserved + // } + // + // rets.forall(x => x) + // } + // + // rets.forall(x => x) + // } + // + // + // def putToKeyValues(put: PutRequest) = { + // val ts = put.timestamp() + // for ((q, v) <- put.qualifiers().zip(put.values)) yield { + // new KeyValue(put.key(), put.family(), q, ts, v) + // } + // } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala index fca7dc0..e2a3363 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala @@ -2,12 +2,12 @@ package com.kakao.s2graph.core import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop} import com.kakao.s2graph.core.mysqls._ +import scalikejdbc.AutoSession //import com.kakao.s2graph.core.models._ - import com.kakao.s2graph.core.types.{InnerVal, LabelWithDirection} -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} import scala.concurrent.ExecutionContext @@ -16,9 +16,28 @@ trait TestCommonWithModels { import InnerVal._ import types.HBaseType._ - val config = ConfigFactory.load() + var graph: Graph = _ + var config: Config = _ + + def initTests() = { + config = ConfigFactory.load() + graph = new Graph(config)(ExecutionContext.Implicits.global) + + implicit val session = AutoSession + + deleteTestLabel() + deleteTestService() + + createTestService() + createTestLabel() + } + + def zkQuorum = config.getString("hbase.zookeeper.quorum") + + def cluster = config.getString("hbase.zookeeper.quorum") + + implicit val session = AutoSession - val zkQuorum = config.getString("hbase.zookeeper.quorum") val serviceName = "_test_service" val serviceNameV2 = "_test_service_v2" val columnName = "user_id" @@ -31,7 +50,6 @@ trait TestCommonWithModels { val tgtColumnType = "string" val tgtColumnTypeV2 = "string" - val cluster = config.getString("hbase.zookeeper.quorum") val hTableName = "_test_cases" val preSplitSize = 0 val labelName = "_test_label" @@ -54,37 +72,29 @@ trait TestCommonWithModels { val consistencyLevel = "strong" val hTableTTL = None - val graph = new Graph(config)(ExecutionContext.Implicits.global) - - def initTests() = { - deleteTestLabel() - deleteTestService() - - Thread.sleep(1000) - - createTestService() - createTestLabel() - } def createTestService() = { + implicit val session = AutoSession Management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") Management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") } def deleteTestService() = { + implicit val session = AutoSession Management.deleteService(serviceName) Management.deleteService(serviceNameV2) } def deleteTestLabel() = { + implicit val session = AutoSession Management.deleteLabel(labelName) Management.deleteLabel(labelNameV2) Management.deleteLabel(undirectedLabelName) Management.deleteLabel(undirectedLabelNameV2) } - def createTestLabel() = { + implicit val session = AutoSession Management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType, isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4") @@ -98,31 +108,37 @@ trait TestCommonWithModels { isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") } - /** */ - initTests() + def service = Service.findByName(serviceName, useCache = false).get + + def serviceV2 = Service.findByName(serviceNameV2, useCache = false).get + + def column = ServiceColumn.find(service.id.get, columnName, useCache = false).get + + def columnV2 = ServiceColumn.find(serviceV2.id.get, columnNameV2, useCache = false).get + + def tgtColumn = ServiceColumn.find(service.id.get, tgtColumnName, useCache = false).get + + def tgtColumnV2 = ServiceColumn.find(serviceV2.id.get, tgtColumnNameV2, useCache = false).get + + def label = Label.findByName(labelName, useCache = false).get + + def labelV2 = Label.findByName(labelNameV2, useCache = false).get + + def undirectedLabel = Label.findByName(undirectedLabelName, useCache = false).get - lazy val service = Service.findByName(serviceName, useCache = false).get - lazy val serviceV2 = Service.findByName(serviceNameV2, useCache = false).get + def undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = false).get - lazy val column = ServiceColumn.find(service.id.get, columnName, useCache = false).get - lazy val columnV2 = ServiceColumn.find(serviceV2.id.get, columnNameV2, useCache = false).get + def dir = GraphUtil.directions("out") - lazy val tgtColumn = ServiceColumn.find(service.id.get, tgtColumnName, useCache = false).get - lazy val tgtColumnV2 = ServiceColumn.find(serviceV2.id.get, tgtColumnNameV2, useCache = false).get + def op = GraphUtil.operations("insert") - lazy val label = Label.findByName(labelName, useCache = false).get - lazy val labelV2 = Label.findByName(labelNameV2, useCache = false).get + def labelOrderSeq = LabelIndex.DefaultSeq - lazy val undirectedLabel = Label.findByName(undirectedLabelName, useCache = false).get - lazy val undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = false).get + def labelWithDir = LabelWithDirection(label.id.get, dir) - lazy val dir = GraphUtil.directions("out") - lazy val op = GraphUtil.operations("insert") - lazy val labelOrderSeq = LabelIndex.DefaultSeq + def labelWithDirV2 = LabelWithDirection(labelV2.id.get, dir) - lazy val labelWithDir = LabelWithDirection(label.id.get, dir) - lazy val labelWithDirV2 = LabelWithDirection(labelV2.id.get, dir) + def queryParam = QueryParam(labelWithDir) - lazy val queryParam = QueryParam(labelWithDir) - lazy val queryParamV2 = QueryParam(labelWithDirV2) + def queryParamV2 = QueryParam(labelWithDirV2) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala index 478da12..f66eabf 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala @@ -5,43 +5,47 @@ import java.util.concurrent.ExecutorService import com.kakao.s2graph.core.mysqls.{Label, Model} import com.kakao.s2graph.core.{TestCommonWithModels, TestCommon, Graph} import com.typesafe.config.ConfigFactory -import org.scalatest.{FunSuite, Matchers} +import org.scalatest.{BeforeAndAfterAll, Sequential, FunSuite, Matchers} import scala.concurrent.ExecutionContext -/** - * Created by shon on 5/12/15. - */ -class ModelTest extends FunSuite with Matchers with TestCommonWithModels { +class ModelTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll { + override def beforeAll(): Unit = { + initTests() + } + + override def afterAll(): Unit = { + graph.shutdown() + } -// val serviceName = "testService" -// val newServiceName = "newTestService" -// val cluster = "localhost" -// val hbaseTableName = "s2graph-dev" -// val columnName = "user_id" -// val columnType = "long" -// val labelName = "model_test_label" -// val newLabelName = "new_model_test_label" -// val columnMetaName = "is_valid_user" -// val labelMetaName = "is_hidden" -// val hbaseTableTTL = -1 -// val id = 1 -// -// val service = HService(Map("id" -> id, "serviceName" -> serviceName, "cluster" -> cluster, -// "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, "hbaseTableTTL" -> -1)) -// val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" -> service.id.get, -// "columnName" -> columnName, "columnType" -> columnType)) -// val columnMeta = HColumnMeta(Map("id" -> id, "columnId" -> serviceColumn.id.get, "name" -> columnMetaName, "seq" -> 1.toByte)) -// val label = HLabel(Map("id" -> id, "label" -> labelName, -// "srcServiceId" -> service.id.get, "srcColumnName" -> columnName, "srcColumnType" -> columnType, -// "tgtServiceId" -> service.id.get, "tgtColumnName" -> columnName, "tgtColumnType" -> columnType, -// "isDirected" -> true, "serviceName" -> service.serviceName, "serviceId" -> service.id.get, -// "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, "hTableTTL" -> -1 -// )) -// val labelMeta = HLabelMeta(Map("id" -> id, "labelId" -> label.id.get, "name" -> labelMetaName, "seq" -> 1.toByte, -// "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> false)) -// val labelIndex = HLabelIndex(Map("id" -> id, "labelId" -> label.id.get, "seq" -> 1.toByte, -// "metaSeqs" -> "0", "formular" -> "none")) + // val serviceName = "testService" + // val newServiceName = "newTestService" + // val cluster = "localhost" + // val hbaseTableName = "s2graph-dev" + // val columnName = "user_id" + // val columnType = "long" + // val labelName = "model_test_label" + // val newLabelName = "new_model_test_label" + // val columnMetaName = "is_valid_user" + // val labelMetaName = "is_hidden" + // val hbaseTableTTL = -1 + // val id = 1 + // + // val service = HService(Map("id" -> id, "serviceName" -> serviceName, "cluster" -> cluster, + // "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, "hbaseTableTTL" -> -1)) + // val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" -> service.id.get, + // "columnName" -> columnName, "columnType" -> columnType)) + // val columnMeta = HColumnMeta(Map("id" -> id, "columnId" -> serviceColumn.id.get, "name" -> columnMetaName, "seq" -> 1.toByte)) + // val label = HLabel(Map("id" -> id, "label" -> labelName, + // "srcServiceId" -> service.id.get, "srcColumnName" -> columnName, "srcColumnType" -> columnType, + // "tgtServiceId" -> service.id.get, "tgtColumnName" -> columnName, "tgtColumnType" -> columnType, + // "isDirected" -> true, "serviceName" -> service.serviceName, "serviceId" -> service.id.get, + // "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, "hTableTTL" -> -1 + // )) + // val labelMeta = HLabelMeta(Map("id" -> id, "labelId" -> label.id.get, "name" -> labelMetaName, "seq" -> 1.toByte, + // "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> false)) + // val labelIndex = HLabelIndex(Map("id" -> id, "labelId" -> label.id.get, "seq" -> 1.toByte, + // "metaSeqs" -> "0", "formular" -> "none")) test("test Label.findByName") { val labelOpt = Label.findByName(labelName, useCache = false) println(labelOpt) @@ -66,66 +70,66 @@ class ModelTest extends FunSuite with Matchers with TestCommonWithModels { val tgtColumn = labelOpt.get.tgtService println(tgtColumn) } -// test("test create") { -// service.create() -// HService.findByName(serviceName, useCache = false) == Some(service) -// -// serviceColumn.create() -// HServiceColumn.findsByServiceId(service.id.get, useCache = false).headOption == Some(serviceColumn) -// -// columnMeta.create() -// HColumnMeta.findByName(serviceColumn.id.get, columnMetaName, useCache = false) == Some(columnMeta) -// -// label.create() -// HLabel.findByName(labelName, useCache = false) == Some(label) -// -// labelMeta.create() -// HLabelMeta.findByName(label.id.get, labelMetaName, useCache = false) == Some(labelMeta) -// -// labelIndex.create() -// HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).headOption == Some(labelIndex) -// } -// -// test("test update") { -// service.update("cluster", "...") -// HService.findById(service.id.get, useCache = false).cluster == "..." -// -// service.update("serviceName", newServiceName) -// assert(HService.findByName(serviceName, useCache = false) == None) -// HService.findByName(newServiceName, useCache = false).map { service => service.id.get == service.id.get} -// -// label.update("label", newLabelName) -// HLabel.findById(label.id.get, useCache = false).label == "newLabelName" -// -// label.update("consistencyLevel", "strong") -// HLabel.findById(label.id.get, useCache = false).consistencyLevel == "strong" && -// HLabel.findByName(newLabelName).isDefined && -// HLabel.findByName(labelName) == None -// -// } -// test("test read by index") { -// val labels = HLabel.findBySrcServiceId(service.id.get, useCache = false) -// val idxs = HLabelIndex.findByLabelIdAll(label.id.get, useCache = false) -// labels.length == 1 && -// labels.head == label -// idxs.length == 1 && -// idxs.head == labelIndex -// } -// test("test delete") { -//// HLabel.findByName(labelName).foreach { label => -//// label.deleteAll() -//// } -// HLabel.findByName(newLabelName).foreach { label => -// label.deleteAll() -// } -// HLabelMeta.findAllByLabelId(label.id.get, useCache = false).isEmpty && -// HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).isEmpty -// -// service.deleteAll() -// } + // test("test create") { + // service.create() + // HService.findByName(serviceName, useCache = false) == Some(service) + // + // serviceColumn.create() + // HServiceColumn.findsByServiceId(service.id.get, useCache = false).headOption == Some(serviceColumn) + // + // columnMeta.create() + // HColumnMeta.findByName(serviceColumn.id.get, columnMetaName, useCache = false) == Some(columnMeta) + // + // label.create() + // HLabel.findByName(labelName, useCache = false) == Some(label) + // + // labelMeta.create() + // HLabelMeta.findByName(label.id.get, labelMetaName, useCache = false) == Some(labelMeta) + // + // labelIndex.create() + // HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).headOption == Some(labelIndex) + // } + // + // test("test update") { + // service.update("cluster", "...") + // HService.findById(service.id.get, useCache = false).cluster == "..." + // + // service.update("serviceName", newServiceName) + // assert(HService.findByName(serviceName, useCache = false) == None) + // HService.findByName(newServiceName, useCache = false).map { service => service.id.get == service.id.get} + // + // label.update("label", newLabelName) + // HLabel.findById(label.id.get, useCache = false).label == "newLabelName" + // + // label.update("consistencyLevel", "strong") + // HLabel.findById(label.id.get, useCache = false).consistencyLevel == "strong" && + // HLabel.findByName(newLabelName).isDefined && + // HLabel.findByName(labelName) == None + // + // } + // test("test read by index") { + // val labels = HLabel.findBySrcServiceId(service.id.get, useCache = false) + // val idxs = HLabelIndex.findByLabelIdAll(label.id.get, useCache = false) + // labels.length == 1 && + // labels.head == label + // idxs.length == 1 && + // idxs.head == labelIndex + // } + // test("test delete") { + //// HLabel.findByName(labelName).foreach { label => + //// label.deleteAll() + //// } + // HLabel.findByName(newLabelName).foreach { label => + // label.deleteAll() + // } + // HLabelMeta.findAllByLabelId(label.id.get, useCache = false).isEmpty && + // HLabelIndex.findByLabelIdAll(label.id.get, useCache = false).isEmpty + // + // service.deleteAll() + // } -// test("test labelIndex") { -// println(HLabelIndex.findByLabelIdAll(1)) -// } + // test("test labelIndex") { + // println(HLabelIndex.findByLabelIdAll(1)) + // } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/mysqls/ExperimentSpec.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/mysqls/ExperimentSpec.scala b/s2core/src/test/scala/com/kakao/s2graph/core/mysqls/ExperimentSpec.scala index 2bc764d..078b5a9 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/mysqls/ExperimentSpec.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/mysqls/ExperimentSpec.scala @@ -6,9 +6,6 @@ import com.typesafe.config.ConfigFactory import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import scalikejdbc._ -/** - * Created by hsleep on 2015. 11. 30.. - */ class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val Ttl = 2 override def beforeAll(): Unit = { @@ -19,37 +16,7 @@ class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val props = new Properties() props.setProperty("cache.ttl.seconds", Ttl.toString) Model.apply(ConfigFactory.load(ConfigFactory.parseProperties(props))) - /* -CREATE TABLE `experiments` ( - `id` integer NOT NULL AUTO_INCREMENT, - `service_id` integer NOT NULL, - `service_name` varchar(128) NOT NULL, - `name` varchar(64) NOT NULL, - `description` varchar(255) NOT NULL, - `experiment_type` varchar(8) NOT NULL DEFAULT 'u', - `total_modular` int NOT NULL DEFAULT 100, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_service_id_name` (`service_id`, `name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -CREATE TABLE `buckets` ( - `id` integer NOT NULL AUTO_INCREMENT, - `experiment_id` integer NOT NULL, - `uuid_mods` varchar(64) NOT NULL, - `traffic_ratios` varchar(64) NOT NULL, - `http_verb` varchar(8) NOT NULL, - `api_path` text NOT NULL, - `uuid_key` varchar(128), - `uuid_placeholder` varchar(64), - `request_body` text NOT NULL, - `timeout` int NOT NULL DEFAULT 1000, - `impression_id` varchar(64) NOT NULL, - `is_graph_query` tinyint NOT NULL DEFAULT 1, - PRIMARY KEY (`id`), - UNIQUE KEY `ux_impression_id` (`impression_id`), - INDEX `idx_experiment_id` (`experiment_id`), - INDEX `idx_impression_id` (`impression_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - */ + implicit val session = AutoSession sql"""DELETE FROM buckets""".update().apply() sql"""DELETE FROM experiments""".update().apply() @@ -58,6 +25,7 @@ CREATE TABLE `buckets` ( sql"""INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, impression_id) VALUES($expId, "1~100", "POST", "/a/b/c", "None", "imp1")""".update().apply() + } "Experiment" should "find bucket list" in { @@ -70,7 +38,7 @@ CREATE TABLE `buckets` ( } it should "update bucket list after cache ttl time" in { - Experiment.findBy(1, "exp1").foreach { exp => + Experiment.findBy(1, "exp1").foreach { exp => val bucket = exp.buckets.head bucket.impressionId should equal("imp1") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala index 73d0d76..393d8b1 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala @@ -8,7 +8,6 @@ import play.api.libs.json.Json class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { // dummy data for dummy edge - import HBaseType.{VERSION1, VERSION2} val ts = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala index b404559..38b5b99 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala @@ -1,15 +1,16 @@ package com.kakao.s2graph.core.storage.hbase import com.kakao.s2graph.core.Graph +import com.typesafe.config.ConfigFactory + import org.apache.hadoop.hbase.util.Bytes import org.hbase.async.GetRequest import org.scalatest.{FunSuite, Matchers} -import scala.concurrent.ExecutionContext.Implicits.global -/** - * Created by hsleep([email protected]) on 2015. 11. 9.. - */ +import scala.concurrent.ExecutionContext + class AsynchbaseQueryBuilderTest extends FunSuite with Matchers { + val dummyRequests = { for { id <- 0 until 1000 @@ -18,7 +19,8 @@ class AsynchbaseQueryBuilderTest extends FunSuite with Matchers { } } - val config = Graph.DefaultConfig + implicit val ec = ExecutionContext.Implicits.global + val config = ConfigFactory.load() val graph = new Graph(config) val qb = new AsynchbaseQueryBuilder(graph.storage.asInstanceOf[AsynchbaseStorage]) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/types/InnerValTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/types/InnerValTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/types/InnerValTest.scala index c0c242b..69a299d 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/types/InnerValTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/types/InnerValTest.scala @@ -1,14 +1,14 @@ package com.kakao.s2graph.core.types +import com.kakao.s2graph.core.TestCommonWithModels import com.kakao.s2graph.core.types._ import org.apache.hadoop.hbase.util.{Bytes, OrderedBytes, SimplePositionedByteRange} import org.scalatest.{Matchers, FunSuite} import play.api.libs.json.Json -/** - * Created by shon on 5/28/15. - */ -class InnerValTest extends FunSuite with Matchers { +class InnerValTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + import HBaseType.{VERSION2, VERSION1} val decimals = List( BigDecimal(Long.MinValue), @@ -44,52 +44,52 @@ class InnerValTest extends FunSuite with Matchers { for { innerVal <- ranges } { - val bytes = innerVal.bytes - val (decoded, numOfBytesUsed) = InnerVal.fromBytes(bytes, 0, bytes.length, version) - innerVal == decoded shouldBe true - bytes.length == numOfBytesUsed shouldBe true - } + val bytes = innerVal.bytes + val (decoded, numOfBytesUsed) = InnerVal.fromBytes(bytes, 0, bytes.length, version) + innerVal == decoded shouldBe true + bytes.length == numOfBytesUsed shouldBe true + } } -// test("big decimal") { -// for { -// version <- List(VERSION2, VERSION1) -// } { -// val innerVals = decimals.map { num => InnerVal.withNumber(num, version)} -// testEncodeDecode(innerVals, version) -// } -// } -// test("text") { -// for { -// version <- List(VERSION2) -// } { -// val innerVals = texts.map { t => InnerVal.withStr(t, version) } -// testEncodeDecode(innerVals, version) -// } -// } -// test("string") { -// for { -// version <- List(VERSION2, VERSION1) -// } { -// val innerVals = strings.map { t => InnerVal.withStr(t, version) } -// testEncodeDecode(innerVals, version) -// } -// } -// test("blob") { -// for { -// version <- List(VERSION2) -// } { -// val innerVals = blobs.map { t => InnerVal.withBlob(t, version) } -// testEncodeDecode(innerVals, version) -// } -// } -// test("boolean") { -// for { -// version <- List(VERSION2, VERSION1) -// } { -// val innerVals = booleans.map { t => InnerVal.withBoolean(t, version) } -// testEncodeDecode(innerVals, version) -// } -// } + // test("big decimal") { + // for { + // version <- List(VERSION2, VERSION1) + // } { + // val innerVals = decimals.map { num => InnerVal.withNumber(num, version)} + // testEncodeDecode(innerVals, version) + // } + // } + // test("text") { + // for { + // version <- List(VERSION2) + // } { + // val innerVals = texts.map { t => InnerVal.withStr(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + // test("string") { + // for { + // version <- List(VERSION2, VERSION1) + // } { + // val innerVals = strings.map { t => InnerVal.withStr(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + // test("blob") { + // for { + // version <- List(VERSION2) + // } { + // val innerVals = blobs.map { t => InnerVal.withBlob(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } + // test("boolean") { + // for { + // version <- List(VERSION2, VERSION1) + // } { + // val innerVals = booleans.map { t => InnerVal.withBoolean(t, version) } + // testEncodeDecode(innerVals, version) + // } + // } test("korean") { val small = InnerVal.withStr("ê°", VERSION2) val large = InnerVal.withStr("ë", VERSION2) @@ -99,32 +99,32 @@ class InnerValTest extends FunSuite with Matchers { println (Bytes.compareTo(smallBytes, largeBytes)) true } -// test("innerVal") { -// val srcVal = InnerVal.withLong(44391298, VERSION2) -// val srcValV1 = InnerVal.withLong(44391298, VERSION1) -// val tgtVal = InnerVal.withLong(7295564, VERSION2) -// -// val a = VertexId(0, srcVal) -// val b = SourceVertexId(0, srcVal) -// val c = TargetVertexId(0, srcVal) -// val aa = VertexId(0, srcValV1) -// val bb = SourceVertexId(0, srcValV1) -// val cc = TargetVertexId(0, srcValV1) -// println(a.bytes.toList) -// println(b.bytes.toList) -// println(c.bytes.toList) -// -// println(aa.bytes.toList) -// println(bb.bytes.toList) -// println(cc.bytes.toList) -// } -// test("aa") { -// val bytes = InnerVal.withLong(Int.MaxValue, VERSION2).bytes -// val pbr = new SimplePositionedByteRange(bytes) -// pbr.setOffset(1) -// println(pbr.getPosition) -// val num = OrderedBytes.decodeNumericAsBigDecimal(pbr) -// println(pbr.getPosition) -// true -// } + // test("innerVal") { + // val srcVal = InnerVal.withLong(44391298, VERSION2) + // val srcValV1 = InnerVal.withLong(44391298, VERSION1) + // val tgtVal = InnerVal.withLong(7295564, VERSION2) + // + // val a = VertexId(0, srcVal) + // val b = SourceVertexId(0, srcVal) + // val c = TargetVertexId(0, srcVal) + // val aa = VertexId(0, srcValV1) + // val bb = SourceVertexId(0, srcValV1) + // val cc = TargetVertexId(0, srcValV1) + // println(a.bytes.toList) + // println(b.bytes.toList) + // println(c.bytes.toList) + // + // println(aa.bytes.toList) + // println(bb.bytes.toList) + // println(cc.bytes.toList) + // } + // test("aa") { + // val bytes = InnerVal.withLong(Int.MaxValue, VERSION2).bytes + // val pbr = new SimplePositionedByteRange(bytes) + // pbr.setOffset(1) + // println(pbr.getPosition) + // val num = OrderedBytes.decodeNumericAsBigDecimal(pbr) + // println(pbr.getPosition) + // true + // } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala new file mode 100644 index 0000000..a661ac3 --- /dev/null +++ b/s2rest_play/app/Bootstrap.scala @@ -0,0 +1,77 @@ +package com.kakao.s2graph.rest + +import java.util.concurrent.Executors + +import actors.QueueActor +import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.utils.logger +import com.kakao.s2graph.core.{ExceptionHandler, Graph} +import config.Config +import controllers.{AdminController, ApplicationController} +import play.api.Application +import play.api.mvc.{WithFilters, _} +import play.filters.gzip.GzipFilter + +import scala.concurrent.{ExecutionContext, Future} +import scala.io.Source +import scala.util.Try + +object Global extends WithFilters(new GzipFilter()) { + var s2graph: Graph = _ + var s2parser: RequestParser = _ + + // Application entry point + override def onStart(app: Application) { + ApplicationController.isHealthy = false + + val numOfThread = Runtime.getRuntime.availableProcessors() + val threadPool = Executors.newFixedThreadPool(numOfThread) + val ec = ExecutionContext.fromExecutor(threadPool) + + val config = Config.conf.underlying + + // init s2graph with config + s2graph = new Graph(config)(ec) + s2parser = new RequestParser(s2graph.config) // merged config + + QueueActor.init(s2graph) + + if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) { + ExceptionHandler.apply(config) + } + + val defaultHealthOn = Config.conf.getBoolean("app.health.on").getOrElse(true) + ApplicationController.deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get + + ApplicationController.isHealthy = defaultHealthOn + logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") + } + + override def onStop(app: Application) { + QueueActor.shutdown() + + if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) { + ExceptionHandler.shutdown() + } + + /** + * shutdown hbase client for flush buffers. + */ + s2graph.shutdown() + } + + override def onError(request: RequestHeader, ex: Throwable): Future[Result] = { + logger.error(s"onError => ip:${request.remoteAddress}, request:${request}", ex) + Future.successful(Results.InternalServerError) + } + + override def onHandlerNotFound(request: RequestHeader): Future[Result] = { + logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, request:${request}") + Future.successful(Results.NotFound) + } + + override def onBadRequest(request: RequestHeader, error: String): Future[Result] = { + logger.error(s"onBadRequest => ip:${request.remoteAddress}, request:$request, error:$error") + Future.successful(Results.BadRequest(error)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/actors/QueueActor.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/actors/QueueActor.scala b/s2rest_play/app/actors/QueueActor.scala new file mode 100644 index 0000000..74bc65d --- /dev/null +++ b/s2rest_play/app/actors/QueueActor.scala @@ -0,0 +1,92 @@ +package actors + +import java.util.concurrent.TimeUnit + +import actors.Protocol.FlushAll +import akka.actor._ +import com.kakao.s2graph.core.ExceptionHandler._ +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.utils.logger +import config.Config +import play.api.Play.current +import play.api.libs.concurrent.Akka + +import scala.collection.mutable +import scala.concurrent.duration.Duration + +/** + * Created by shon on 9/2/15. + */ +object Protocol { + + case object Flush + + case object FlushAll + +} + +object QueueActor { + /** we are throttling down here so fixed number of actor to constant */ + var router: ActorRef = _ + + // Akka.system.actorOf(props(), name = "queueActor") + def init(s2: Graph) = { + router = Akka.system.actorOf(props(s2)) + } + + def shutdown() = { + router ! FlushAll + Akka.system.shutdown() + Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2) + } + + def props(s2: Graph): Props = Props(classOf[QueueActor], s2) +} + +class QueueActor(s2: Graph) extends Actor with ActorLogging { + + import Protocol._ + + implicit val ec = context.system.dispatcher + // logger.error(s"QueueActor: $self") + val queue = mutable.Queue.empty[GraphElement] + var queueSize = 0L + val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE + val timeUnitInMillis = 10 + val rateLimitTimeStep = 1000 / timeUnitInMillis + val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep + + + context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, TimeUnit.MILLISECONDS), self, Flush) + + override def receive: Receive = { + case element: GraphElement => + + if (queueSize > maxQueueSize) { + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None)) + } else { + queueSize += 1L + queue.enqueue(element) + } + + case Flush => + val elementsToFlush = + if (queue.size < rateLimit) queue.dequeueAll(_ => true) + else (0 until rateLimit).map(_ => queue.dequeue()) + + val flushSize = elementsToFlush.size + + queueSize -= elementsToFlush.length + s2.mutateElements(elementsToFlush) + + if (flushSize > 0) { + logger.info(s"flush: $flushSize, $queueSize") + } + + case FlushAll => + s2.mutateElements(queue) + context.stop(self) + + case _ => logger.error("unknown protocol") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/config/Config.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/config/Config.scala b/s2rest_play/app/config/Config.scala new file mode 100644 index 0000000..98b87c5 --- /dev/null +++ b/s2rest_play/app/config/Config.scala @@ -0,0 +1,41 @@ +package config + +import play.api.Play + +object Config { + // HBASE + lazy val HBASE_ZOOKEEPER_QUORUM = conf.getString("hbase.zookeeper.quorum").getOrElse("localhost") + + + // HBASE CLIENT + lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort + lazy val RPC_TIMEOUT = conf.getInt("hbase.client.operation.timeout").getOrElse(1000) + lazy val MAX_ATTEMPT = conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3) + + // PHASE + lazy val PHASE = conf.getString("phase").getOrElse("dev") + lazy val conf = Play.current.configuration + + // CACHE + lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600) + lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000) + + //KAFKA + lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("kafka.metadata.broker.list").getOrElse("localhost") + lazy val KAFKA_PRODUCER_POOL_SIZE = conf.getInt("kafka.producer.pool.size").getOrElse(0) + lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}" + lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async" + lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed" + + // is query or write + lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true) + lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true) + + + // query limit per step + lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300) + + // local queue actor + lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = conf.getInt("local.queue.actor.max.queue.size").getOrElse(10000) + lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = conf.getInt("local.queue.actor.rate.limit").getOrElse(1000) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/config/CounterConfig.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/config/CounterConfig.scala b/s2rest_play/app/config/CounterConfig.scala new file mode 100644 index 0000000..2569d55 --- /dev/null +++ b/s2rest_play/app/config/CounterConfig.scala @@ -0,0 +1,10 @@ +package config + +/** + * Created by hsleep([email protected]) on 15. 9. 3.. + */ +object CounterConfig { + // kafka + lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}" + lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}" +}
