Repository: incubator-s2graph Updated Branches: refs/heads/master b5908311a -> 8dbb9a3ee
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala index fda9991..6054d67 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala @@ -20,27 +20,27 @@ package org.apache.s2graph.core.Integrate import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.utils.logger import play.api.libs.json.{JsObject, Json} class CrudTest extends IntegrateCommon { import CrudHelper._ import TestUtil._ - test("test CRUD") { - var tcNum = 0 - var tcString = "" - var bulkQueries = List.empty[(Long, String, String)] - var expected = Map.empty[String, String] - - val curTime = System.currentTimeMillis - val t1 = curTime + 0 - val t2 = curTime + 1 - val t3 = curTime + 2 - val t4 = curTime + 3 - val t5 = curTime + 4 - - val tcRunner = new CrudTestRunner() - tcNum = 1 + var tcString = "" + var bulkQueries = List.empty[(Long, String, String)] + var expected = Map.empty[String, String] + + val curTime = System.currentTimeMillis + val t1 = curTime + 0 + val t2 = curTime + 1 + val t3 = curTime + 2 + val t4 = curTime + 3 + val t5 = curTime + 4 + + val tcRunner = new CrudTestRunner() + test("1: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") { + val tcNum = 1 tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " bulkQueries = List( @@ -50,8 +50,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 2 + } + test("2: [t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test") { + val tcNum = 2 tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " bulkQueries = List( (t1, "insert", "{\"time\": 10}"), @@ -60,8 +61,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 3 + } + test("3: [t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test") { + val tcNum = 3 tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test " bulkQueries = List( (t3, "insert", "{\"time\": 10, \"weight\": 20}"), @@ -70,8 +72,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 4 + } + test("4: [t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test") { + val tcNum = 4 tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test " bulkQueries = List( (t3, "insert", "{\"time\": 10, \"weight\": 20}"), @@ -80,8 +83,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 5 + } + test("5: [t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test") { + val tcNum = 5 tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test" bulkQueries = List( (t2, "delete", ""), @@ -90,8 +94,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 6 + } + test("6: [t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test") { + val tcNum = 6 tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test " bulkQueries = List( (t2, "delete", ""), @@ -100,8 +105,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 7 + } + test("7: [t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test ") { + val tcNum = 7 tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test " bulkQueries = List( (t1, "update", "{\"time\": 10}"), @@ -110,7 +116,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 8 + } + test("8: [t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test ") { + val tcNum = 8 tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test " bulkQueries = List( (t1, "update", "{\"time\": 10}"), @@ -119,7 +127,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 9 + } + test("9: [t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test") { + val tcNum = 9 tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test " bulkQueries = List( (t2, "delete", ""), @@ -128,7 +138,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 10 + } + test("10: [t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test") { + val tcNum = 10 tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test" bulkQueries = List( (t2, "delete", ""), @@ -137,7 +149,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 11 + } + test("11: [t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test") { + val tcNum = 11 tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test " bulkQueries = List( (t3, "update", "{\"time\": 10, \"weight\": 20}"), @@ -146,7 +160,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - tcNum = 12 + } + test("12: [t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test") { + val tcNum = 12 tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test " bulkQueries = List( (t3, "update", "{\"time\": 10, \"weight\": 20}"), @@ -155,8 +171,9 @@ class CrudTest extends IntegrateCommon { expected = Map("time" -> "10", "weight" -> "20") tcRunner.run(tcNum, tcString, bulkQueries, expected) - - tcNum = 13 + } + test("13: [t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test ") { + val tcNum = 13 tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test " bulkQueries = List( (t5, "update", "{\"is_blocked\": true}"), @@ -169,6 +186,15 @@ class CrudTest extends IntegrateCommon { tcRunner.run(tcNum, tcString, bulkQueries, expected) } + test("14 - test lock expire") { + for { + labelName <- List(testLabelName, testLabelName2) + } { + val id = 0 + tcRunner.expireTC(labelName, id) + } + } + object CrudHelper { @@ -191,7 +217,7 @@ class CrudTest extends IntegrateCommon { val bulkEdges = (for ((ts, op, props) <- opWithProps) yield { TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props) }) - + println(s"${bulkEdges.mkString("\n")}") TestUtil.insertEdgesSync(bulkEdges: _*) for { @@ -210,6 +236,7 @@ class CrudTest extends IntegrateCommon { val jsResult = TestUtil.getEdgesSync(query) val results = jsResult \ "results" + val deegrees = (jsResult \ "degrees").as[List[JsObject]] val propsLs = (results \\ "props").seq (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1) @@ -229,6 +256,63 @@ class CrudTest extends IntegrateCommon { } } + def expireTC(labelName: String, id: Int) = { + var i = 1 + val label = Label.findByName(labelName).get + val serviceName = label.serviceName + val columnName = label.srcColumnName + val id = 0 + + while (i < 1000) { + val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString())) + val rets = TestUtil.insertEdgesSync(bulkEdges: _*) + + + val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id) + + if (!rets.forall(identity)) { + Thread.sleep(graph.storage.LockExpireDuration + 100) + /** expect current request would be ignored */ + val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString())) + val rets = TestUtil.insertEdgesSync(bulkEdges: _*) + if (rets.forall(identity)) { + // check + val jsResult = TestUtil.getEdgesSync(queryJson) + (jsResult \\ "time").head.as[Int] should be(10) + logger.debug(jsResult) + i = 100000 + } + } + + i += 1 + } + + i = 1 + while (i < 1000) { + val bulkEdges = Seq(TestUtil.toEdge(i, "u", "e", id, id, testLabelName, Json.obj("time" -> 10).toString())) + val rets = TestUtil.insertEdgesSync(bulkEdges: _*) + + + val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id) + + if (!rets.forall(identity)) { + Thread.sleep(graph.storage.LockExpireDuration + 100) + /** expect current request would be applied */ + val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString())) + val rets = TestUtil.insertEdgesSync(bulkEdges: _*) + if (rets.forall(identity)) { + // check + val jsResult = TestUtil.getEdgesSync(queryJson) + (jsResult \\ "time").head.as[Int] should be(20) + logger.debug(jsResult) + i = 100000 + } + } + + i += 1 + } + } + def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse( s""" { "srcVertices": [ { "serviceName": "$serviceName", @@ -240,6 +324,15 @@ class CrudTest extends IntegrateCommon { "offset": 0, "limit": 10, "cacheTTL": $cacheTTL }]]}""") + + def querySnapshotEdgeJson(serviceName: String, columnName: String, labelName: String, id: Int) = Json.parse( + s""" { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id } ], + "steps": [ [ { + "label": "$labelName", + "_to": $id }]]}""") } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index 225d396..b341ec5 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -43,7 +43,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { config = ConfigFactory.load() graph = new Graph(config)(ExecutionContext.Implicits.global) management = new Management(graph) - parser = new RequestParser(graph.config) + parser = new RequestParser(graph) initTestData() } @@ -120,7 +120,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { def deleteAllSync(jsValue: JsValue) = { val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json => val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json) - val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) + val srcVertices = vertices + val future = graph.deleteAllAdjacentEdges(srcVertices.toList, labels, GraphUtil.directions(direction), ts) future }) @@ -131,10 +132,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { def getEdgesSync(queryJson: JsValue): JsValue = { logger.info(Json.prettyPrint(queryJson)) val restHandler = new RestHandler(graph) - Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson), HttpRequestWaitingTime) + val result = Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toJson), HttpRequestWaitingTime) + logger.debug(s"${Json.prettyPrint(result)}") + result } def insertEdgesSync(bulkEdges: String*) = { + logger.debug(s"${bulkEdges.mkString("\n")}") val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) val jsResult = Await.result(req, HttpRequestWaitingTime) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala index 9c52b32..54bb12c 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala @@ -307,7 +307,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { "label": "$testLabelName", "direction": "in", "offset": 0, - "limit": 10 + "limit": 1000 } ]] }""".stripMargin) @@ -328,54 +328,54 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { -// test("pagination and _to") { -// def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( -// s""" -// { "srcVertices": [ -// { "serviceName": "${testServiceName}", -// "columnName": "${testColumnName}", -// "id": ${id} -// }], -// "steps": [ -// [ { -// "label": "${testLabelName}", -// "direction": "out", -// "offset": $offset, -// "limit": $limit, -// "_to": $to -// } -// ]] -// } -// """) -// -// val src = System.currentTimeMillis().toInt -// -// val bulkEdges = Seq( -// toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), -// toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), -// toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), -// toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) -// ) -// insertEdgesSync(bulkEdges: _*) -// -// var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) -// var edges = (result \ "results").as[List[JsValue]] -// -// edges.size should be(2) -// (edges(0) \ "to").as[Long] should be(4) -// (edges(1) \ "to").as[Long] should be(3) -// -// result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) -// -// edges = (result \ "results").as[List[JsValue]] -// edges.size should be(2) -// (edges(0) \ "to").as[Long] should be(3) -// (edges(1) \ "to").as[Long] should be(2) -// -// result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) -// edges = (result \ "results").as[List[JsValue]] -// edges.size should be(1) -// } + test("pagination and _to") { + def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( + s""" + { "srcVertices": [ + { "serviceName": "${testServiceName}", + "columnName": "${testColumnName}", + "id": ${id} + }], + "steps": [ + [ { + "label": "${testLabelName}", + "direction": "out", + "offset": $offset, + "limit": $limit, + "_to": $to + } + ]] + } + """) + + val src = System.currentTimeMillis().toInt + + val bulkEdges = Seq( + toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), + toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), + toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), + toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) + ) + insertEdgesSync(bulkEdges: _*) + + var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) + var edges = (result \ "results").as[List[JsValue]] + + edges.size should be(2) + (edges(0) \ "to").as[Long] should be(4) + (edges(1) \ "to").as[Long] should be(3) + + result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) + + edges = (result \ "results").as[List[JsValue]] + edges.size should be(2) + (edges(0) \ "to").as[Long] should be(3) + (edges(1) \ "to").as[Long] should be(2) + + result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) + edges = (result \ "results").as[List[JsValue]] + edges.size should be(1) + } test("order by") { def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj( @@ -907,7 +907,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { """.stripMargin ) - def querySingleVertexWithOp(id: String, op: String, shrinkageVal: Long) = Json.parse( + def queryWithOp(ids: Seq[String], op: String, shrinkageVal: Long) = Json.parse( s"""{ | "limit": 10, | "groupBy": ["from"], @@ -916,7 +916,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { | { | "serviceName": "$testServiceName", | "columnName": "$testColumnName", - | "id": $id + | "ids": [${ids.mkString(",")}] | } | ], | "steps": [ @@ -949,47 +949,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { """.stripMargin ) - def queryMultiVerticesWithOp(id: String, id2: String, op: String, shrinkageVal: Long) = Json.parse( - s"""{ - | "limit": 10, - | "groupBy": ["from"], - | "duplicate": "sum", - | "srcVertices": [ - | { - | "serviceName": "$testServiceName", - | "columnName": "$testColumnName", - | "ids": [$id, $id2] - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$testLabelName", - | "direction": "out", - | "offset": 0, - | "limit": 10, - | "groupBy": ["from"], - | "duplicate": "countSum", - | "transform": [["_from"]] - | } - | ] - | }, { - | "step": [ - | { - | "label": "$testLabelName2", - | "direction": "out", - | "offset": 0, - | "limit": 10, - | "scorePropagateOp": "$op", - | "scorePropagateShrinkage": $shrinkageVal - | } - | ] - | } - | ] - |} - """.stripMargin - ) val testId = "-30000" val testId2 = "-4000" @@ -1014,15 +973,15 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { val secondStepEdgeCount = 4l var shrinkageVal = 10l - var rs = getEdgesSync(querySingleVertexWithOp(testId, "divide", shrinkageVal)) - logger.debug(Json.prettyPrint(rs)) + var rs = getEdgesSync(queryWithOp(Seq(testId), "divide", shrinkageVal)) + var results = (rs \ "results").as[List[JsValue]] results.size should be(1) var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal) (results(0) \ "scoreSum").as[Double] should be(scoreSum) - rs = getEdgesSync(queryMultiVerticesWithOp(testId, testId2, "divide", shrinkageVal)) - logger.debug(Json.prettyPrint(rs)) + rs = getEdgesSync(queryWithOp(Seq(testId, testId2), "divide", shrinkageVal)) + results = (rs \ "results").as[List[JsValue]] results.size should be(2) scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal) @@ -1033,21 +992,21 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { // check for divide zero case shrinkageVal = 30l rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal)) - logger.debug(Json.prettyPrint(rs)) + results = (rs \ "results").as[List[JsValue]] results.size should be(1) (results(0) \ "scoreSum").as[Double] should be(0) // "plus" operation - rs = getEdgesSync(querySingleVertexWithOp(testId, "plus", shrinkageVal)) - logger.debug(Json.prettyPrint(rs)) + rs = getEdgesSync(queryWithOp(Seq(testId), "plus", shrinkageVal)) + results = (rs \ "results").as[List[JsValue]] results.size should be(1) scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount (results(0) \ "scoreSum").as[Long] should be(scoreSum) // "multiply" operation - rs = getEdgesSync(querySingleVertexWithOp(testId, "multiply", shrinkageVal)) + rs = getEdgesSync(queryWithOp(Seq(testId), "multiply", shrinkageVal)) logger.debug(Json.prettyPrint(rs)) results = (rs \ "results").as[List[JsValue]] results.size should be(1) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala index 99b56f7..6092fe4 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/StrongLabelDeleteTest.scala @@ -141,11 +141,12 @@ class StrongLabelDeleteTest extends IntegrateCommon { test("large degrees") { val labelName = testLabelName2 val dir = "out" + val minSize = 0 val maxSize = 100 val deleteSize = 10 val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } + val src = 1092983 + val tgts = (minSize until maxSize).map { ith => src + ith } val deleteTgts = Random.shuffle(tgts).take(deleteSize) val insertRequests = tgts.map { tgt => Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") @@ -181,11 +182,12 @@ class StrongLabelDeleteTest extends IntegrateCommon { test("deleteAll") { val labelName = testLabelName2 val dir = "out" - val maxSize = 100 + val minSize = 200 + val maxSize = 300 val deleteSize = 10 val numOfConcurrentBatch = 100 - val src = System.currentTimeMillis() - val tgts = (0 until maxSize).map { ith => src + ith } + val src = 192338237 + val tgts = (minSize until maxSize).map { ith => src + ith } val deleteTgts = Random.shuffle(tgts).take(deleteSize) val insertRequests = tgts.map { tgt => Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t") @@ -220,7 +222,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { // val labelName = testLabelName val maxTgtId = 10 val batchSize = 10 - val testNum = 100 + val testNum = 10 val numOfBatch = 10 def testInner(startTs: Long, src: Long) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala index a1bff68..603ca12 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/VertexTestHelper.scala @@ -31,6 +31,8 @@ class VertexTestHelper extends IntegrateCommon { import TestUtil._ import VertexTestHelper._ + + test("vertex") { val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0) val (serviceName, columnName) = (testServiceName, testColumnName) @@ -40,9 +42,10 @@ class VertexTestHelper extends IntegrateCommon { println(payload) val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName)) - Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime) + val srcVertices = vertices + Await.result(graph.mutateVertices(srcVertices, withWait = true), HttpRequestWaitingTime) - val res = graph.getVertices(vertices).map { vertices => + val res = graph.getVertices(srcVertices).map { vertices => PostProcess.verticesToJson(vertices) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala index 3f76d59..d62dee8 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,11 +33,12 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { import WeakLabelDeleteHelper._ test("test weak consistency select") { + insertEdgesSync(bulkEdges(): _*) var result = getEdgesSync(query(0)) - println(result) + (result \ "results").as[List[JsValue]].size should be(4) result = getEdgesSync(query(10)) - println(result) + (result \ "results").as[List[JsValue]].size should be(2) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala index 419e9c4..bab6e03 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala @@ -19,10 +19,11 @@ package org.apache.s2graph.core -import org.apache.s2graph.core.types.{InnerVal, InnerValLike} import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.types.{InnerVal, InnerValLike} import org.scalatest.{FunSuite, Matchers} + class JsonParserTest extends FunSuite with Matchers with TestCommon { import InnerVal._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala index 06af38c..61d1096 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/QueryParamTest.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.types.LabelWithDirection +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, HBaseSerializable, LabelWithDirection} import org.scalatest.{FunSuite, Matchers} class QueryParamTest extends FunSuite with Matchers with TestCommon { @@ -101,5 +101,59 @@ class QueryParamTest extends FunSuite with Matchers with TestCommon { println(s">> diff: $duration") } + test("QueryParam interval min/max bytes padding test") { + import HBaseSerializable._ + val queryParam = QueryParam.Empty + def compare(_from: Seq[InnerValLike], _to: Seq[InnerValLike], _value: Seq[InnerValLike]): Boolean = { + val len = _from.length.toByte + val from = _from.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } + val to = _to.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } + val value = _value.zipWithIndex map { case (innerVal: InnerValLike, idx: Int) => idx.toByte -> innerVal } + + val (fromBytes, toBytes) = queryParam.paddingInterval(len, from, to) + val valueBytes = propsToBytes(value) + + val validFrom = Bytes.compareTo(fromBytes, valueBytes) <= 0 + val validTo = Bytes.compareTo(toBytes, valueBytes) >= 0 + + val res = validFrom && validTo + // if (!res) logger.error(s"from: $validFrom, to: $validTo, from: ${_from} to: ${_to} value: ${_value}") + res + } + + val v = "v3" + compare( + Seq(InnerVal.withLong(0L, v)), + Seq(InnerVal.withLong(0L, v)), + Seq(InnerVal.withLong(0L, v))) shouldBe true + + compare( + Seq(InnerVal.withLong(0L, v)), + Seq(InnerVal.withLong(0L, v)), + Seq(InnerVal.withLong(1L, v))) shouldBe false + + compare( + Seq(InnerVal.withLong(1L, v)), + Seq(InnerVal.withLong(1L, v)), + Seq(InnerVal.withLong(0L, v))) shouldBe false + + compare( + Seq(InnerVal.withLong(0L, v)), + Seq(InnerVal.withLong(1L, v)), + Seq(InnerVal.withLong(2L, v))) shouldBe false + + val testNum = 100000 + val tests = for { + n <- 0 to testNum + min = scala.util.Random.nextInt(Int.MaxValue / 2) + 1 + max = min + scala.util.Random.nextInt(min) + value = min + scala.util.Random.nextInt(max - min + 1) + } yield compare( + Seq(InnerVal.withLong(min, v)), + Seq(InnerVal.withLong(max, v)), + Seq(InnerVal.withLong(value, v))) + + tests.forall(identity) shouldBe true + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala index 33a901d..584a641 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala @@ -146,7 +146,7 @@ trait TestCommonWithModels { isDirected = true, serviceNameV4, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION4, false, "lg4", None) management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType, - isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None) + isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION3, false, "lg4", None) management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala index 8ba9ea2..05dcd30 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/JsonBenchmarkSpec.scala @@ -23,41 +23,55 @@ import play.api.libs.json.JsNumber import play.libs.Json class JsonBenchmarkSpec extends BenchmarkCommon { - "to json" >> { - "json benchmark" >> { - duration("map to json") { - (0 to 10) foreach { n => - val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap - Json.toJson(numberMaps) - } + "JsonBenchSpec" should { + + "Json Append" >> { + import play.api.libs.json.{Json, _} + val numberJson = Json.toJson((0 to 1000).map { i => s"$i" -> JsNumber(i * i) }.toMap).as[JsObject] + + /** dummy warm-up **/ + (0 to 10000) foreach { n => + Json.obj(s"$n" -> "dummy") ++ numberJson + } + (0 to 10000) foreach { n => + Json.obj(s"$n" -> numberJson) } - duration("directMakeJson") { - (0 to 10) foreach { n => - var jsObj = play.api.libs.json.Json.obj() - (0 to 10).foreach { n => - jsObj += (n.toString -> JsNumber(n * n)) - } + duration("Append by JsObj ++ JsObj ") { + (0 to 100000) foreach { n => + numberJson ++ Json.obj(s"$n" -> "dummy") } } - duration("map to json 2") { - (0 to 50) foreach { n => - val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * n)) }.toMap - Json.toJson(numberMaps) + duration("Append by Json.obj(newJson -> JsObj)") { + (0 to 100000) foreach { n => + Json.obj(s"$n" -> numberJson) } } + true + } + } - duration("directMakeJson 2") { - (0 to 50) foreach { n => - var jsObj = play.api.libs.json.Json.obj() - (0 to 10).foreach { n => - jsObj += (n.toString -> JsNumber(n * n)) - } + "Make Json" >> { + duration("map to json") { + (0 to 10000) foreach { n => + val numberMaps = (0 to 100).map { n => + n.toString -> JsNumber(n * n) + }.toMap + + Json.toJson(numberMaps) + } + } + + duration("direct") { + (0 to 10000) foreach { n => + var jsObj = play.api.libs.json.Json.obj() + + (0 to 100).foreach { n => + jsObj += (n.toString -> JsNumber(n * n)) } } - true } true } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala index a8777fb..ec19641 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/SamplingBenchmarkSpec.scala @@ -1,102 +1,105 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.benchmark - -import scala.annotation.tailrec -import scala.util.Random - -class SamplingBenchmarkSpec extends BenchmarkCommon { - "sample" should { - - "sample benchmark" in { - @tailrec - def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (set.size == n) set - else randomInt(n, range, set + Random.nextInt(range)) - } - - // sample using random array - def randomArraySample[T](num: Int, ls: List[T]): List[T] = { - val randomNum = randomInt(num, ls.size) - var sample = List.empty[T] - var idx = 0 - ls.foreach { e => - if (randomNum.contains(idx)) sample = e :: sample - idx += 1 - } - sample - } - - // sample using shuffle - def shuffleSample[T](num: Int, ls: List[T]): List[T] = { - Random.shuffle(ls).take(num) - } - - // sample using random number generation - def rngSample[T](num: Int, ls: List[T]): List[T] = { - var sampled = List.empty[T] - val N = ls.size // population - var t = 0 // total input records dealt with - var m = 0 // number of items selected so far - - while (m < num) { - val u = Random.nextDouble() - if ((N - t) * u < num - m) { - sampled = ls(t) :: sampled - m += 1 - } - t += 1 - } - sampled - } - - // test data - val testLimit = 10000 - val testNum = 10 - val testData = (0 to 1000).toList - - // dummy for warm-up - (0 to testLimit) foreach { n => - randomArraySample(testNum, testData) - shuffleSample(testNum, testData) - rngSample(testNum, testData) - } - - duration("Random Array Sampling") { - (0 to testLimit) foreach { _ => - val sampled = randomArraySample(testNum, testData) - } - } - - duration("Shuffle Sampling") { - (0 to testLimit) foreach { _ => - val sampled = shuffleSample(testNum, testData) - } - } - - duration("RNG Sampling") { - (0 to testLimit) foreach { _ => - val sampled = rngSample(testNum, testData) - } - } - true - } - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +//package org.apache.s2graph.rest.play.benchmark +// +//import play.api.test.{FakeApplication, PlaySpecification, WithApplication} +// +//import scala.annotation.tailrec +//import scala.util.Random +// +//class SamplingBenchmarkSpec extends BenchmarkCommon with PlaySpecification { +// "sample" should { +// implicit val app = FakeApplication() +// +// "sample benchmark" in new WithApplication(app) { +// @tailrec +// def randomInt(n: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { +// if (set.size == n) set +// else randomInt(n, range, set + Random.nextInt(range)) +// } +// +// // sample using random array +// def randomArraySample[T](num: Int, ls: List[T]): List[T] = { +// val randomNum = randomInt(num, ls.size) +// var sample = List.empty[T] +// var idx = 0 +// ls.foreach { e => +// if (randomNum.contains(idx)) sample = e :: sample +// idx += 1 +// } +// sample +// } +// +// // sample using shuffle +// def shuffleSample[T](num: Int, ls: List[T]): List[T] = { +// Random.shuffle(ls).take(num) +// } +// +// // sample using random number generation +// def rngSample[T](num: Int, ls: List[T]): List[T] = { +// var sampled = List.empty[T] +// val N = ls.size // population +// var t = 0 // total input records dealt with +// var m = 0 // number of items selected so far +// +// while (m < num) { +// val u = Random.nextDouble() +// if ((N - t) * u < num - m) { +// sampled = ls(t) :: sampled +// m += 1 +// } +// t += 1 +// } +// sampled +// } +// +// // test data +// val testLimit = 10000 +// val testNum = 10 +// val testData = (0 to 1000).toList +// +// // dummy for warm-up +// (0 to testLimit) foreach { n => +// randomArraySample(testNum, testData) +// shuffleSample(testNum, testData) +// rngSample(testNum, testData) +// } +// +// duration("Random Array Sampling") { +// (0 to testLimit) foreach { _ => +// val sampled = randomArraySample(testNum, testData) +// } +// } +// +// duration("Shuffle Sampling") { +// (0 to testLimit) foreach { _ => +// val sampled = shuffleSample(testNum, testData) +// } +// } +// +// duration("RNG Sampling") { +// (0 to testLimit) foreach { _ => +// val sampled = rngSample(testNum, testData) +// } +// } +// true +// } +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala index a654a83..c8f65bf 100644 --- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala +++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala @@ -25,7 +25,7 @@ import com.typesafe.config.ConfigFactory import io.netty.bootstrap.ServerBootstrap import io.netty.buffer.{ByteBuf, Unpooled} import io.netty.channel._ -import io.netty.channel.epoll.{EpollServerSocketChannel, EpollEventLoopGroup} +import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel} import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel @@ -36,14 +36,14 @@ import io.netty.util.CharsetUtil import org.apache.s2graph.core.GraphExceptions.BadQueryException import org.apache.s2graph.core.mysqls.Experiment import org.apache.s2graph.core.rest.RestHandler -import org.apache.s2graph.core.rest.RestHandler.HandlerResult +import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult} import org.apache.s2graph.core.utils.Extensions._ import org.apache.s2graph.core.utils.logger import org.apache.s2graph.core.{Graph, JSONParser, PostProcess} import play.api.libs.json._ import scala.collection.mutable -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} import scala.io.Source import scala.util.{Failure, Success, Try} import scala.language.existentials @@ -58,6 +58,10 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends val NotFound = HttpResponseStatus.NOT_FOUND val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR + implicit val nettyHeadersLookup = new CanLookup[HttpHeaders] { + override def lookup(m: HttpHeaders, key: String) = Option(m.get(key)) + } + def badRoute(ctx: ChannelHandlerContext) = simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) @@ -82,7 +86,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends } } - def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = { + def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: String, result: HandlerResult, startedAt: Long) = { var closeOpt = CloseOpt var headers = mutable.ArrayBuilder.make[(String, String)] @@ -119,43 +123,68 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends } } + private def healthCheck(ctx: ChannelHandlerContext)(predicate: Boolean): Unit = { + if (predicate) { + val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8) + simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt) + } else { + simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt) + } + } + + private def updateHealthCheck(ctx: ChannelHandlerContext)(newValue: Boolean)(updateOp: Boolean => Unit): Unit = { + updateOp(newValue) + val newHealthCheckMsg = Unpooled.copiedBuffer(newValue.toString, CharsetUtil.UTF_8) + simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt) + } + override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = { val uri = req.getUri val startedAt = System.currentTimeMillis() - + val checkFunc = healthCheck(ctx) _ + val updateFunc = updateHealthCheck(ctx) _ req.getMethod match { case HttpMethod.GET => uri match { - case "/health_check.html" => - if (NettyServer.isHealthy) { - val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt) + case "/health_check.html" => checkFunc(NettyServer.isHealthy) + case "/fallback_check.html" => checkFunc(NettyServer.isFallbackHealthy) + case "/query_fallback_check.html" => checkFunc(NettyServer.isQueryFallbackHealthy) + case s if s.startsWith("/graphs/getEdge/") => + if (!NettyServer.isQueryFallbackHealthy) { + val result = HandlerResult(body = Future.successful(PostProcess.emptyResults)) + toResponse(ctx, req, s, result, startedAt) } else { - simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt) + val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4) + val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) + val result = s2rest.checkEdges(params) + toResponse(ctx, req, s, result, startedAt) } - - case s if s.startsWith("/graphs/getEdge/") => - // src, tgt, label, dir - val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4) - val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) - val result = s2rest.checkEdges(params) - toResponse(ctx, req, params, result, startedAt) case _ => badRoute(ctx) } case HttpMethod.PUT => if (uri.startsWith("/health_check/")) { - val newHealthCheck = uri.split("/").last.toBoolean - NettyServer.isHealthy = newHealthCheck - val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt) - } else badRoute(ctx) + val newValue = uri.split("/").last.toBoolean + updateFunc(newValue) { v => NettyServer.isHealthy = v } + } else if (uri.startsWith("/query_fallback_check/")) { + val newValue = uri.split("/").last.toBoolean + updateFunc(newValue) { v => NettyServer.isQueryFallbackHealthy = v } + } else if (uri.startsWith("/fallback_check/")) { + val newValue = uri.split("/").last.toBoolean + updateFunc(newValue) { v => NettyServer.isFallbackHealthy = v } + } else { + badRoute(ctx) + } case HttpMethod.POST => val body = req.content.toString(CharsetUtil.UTF_8) - - val result = s2rest.doPost(uri, body, Option(req.headers().get(Experiment.impressionKey))) - toResponse(ctx, req, Json.parse(body), result, startedAt) + if (!NettyServer.isQueryFallbackHealthy) { + val result = HandlerResult(body = Future.successful(PostProcess.emptyResults)) + toResponse(ctx, req, body, result, startedAt) + } else { + val result = s2rest.doPost(uri, body, req.headers()) + toResponse(ctx, req, body, result, startedAt) + } case _ => simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) @@ -163,9 +192,14 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends } override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - cause.printStackTrace() - logger.error(s"exception on query.", cause) - simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) + cause match { + case e: java.io.IOException => + ctx.channel().close().addListener(CloseOpt.get) + case _ => + cause.printStackTrace() + logger.error(s"exception on query.", cause) + simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) + } } } @@ -178,6 +212,7 @@ object NettyServer { val config = ConfigFactory.load() val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get val transport = Try(config.getString("netty.transport")).recover { case _ => "jdk" }.get + val maxBodySize = Try(config.getInt("max.body.size")).recover { case _ => 65536 * 2 }.get // init s2graph with config val s2graph = new Graph(config)(ec) @@ -185,6 +220,8 @@ object NettyServer { val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get var isHealthy = config.getBooleanWithFallback("app.health.on", true) + var isFallbackHealthy = true + var isQueryFallbackHealthy = true logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") logger.info(s"transport: $transport") @@ -201,14 +238,13 @@ object NettyServer { try { val b: ServerBootstrap = new ServerBootstrap() .option(ChannelOption.SO_BACKLOG, Int.box(2048)) - b.group(bossGroup, workerGroup).channel(channelClass) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel) { val p = ch.pipeline() p.addLast(new HttpServerCodec()) - p.addLast(new HttpObjectAggregator(65536)) + p.addLast(new HttpObjectAggregator(maxBodySize)) p.addLast(new S2RestHandler(rest)(ec)) } }) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala index 30a5ee4..692ab1e 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala @@ -52,7 +52,7 @@ object Global extends WithFilters(new GzipFilter()) { // init s2graph with config s2graph = new Graph(config)(ec) storageManagement = new Management(s2graph) - s2parser = new RequestParser(s2graph.config) // merged config + s2parser = new RequestParser(s2graph) s2rest = new RestHandler(s2graph)(ec) logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") @@ -83,7 +83,7 @@ object Global extends WithFilters(new GzipFilter()) { wallLogHandler.shutdown() QueueActor.shutdown() - /* + /** * shutdown hbase client for flush buffers. */ shutdown() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala index 3c488fe..3c49954 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala @@ -43,8 +43,12 @@ object Config { lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("kafka.metadata.broker.list").getOrElse("localhost") lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}" + lazy val KAFKA_LOG_TOPIC_JSON = s"s2graphIn${PHASE}Json" lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async" + lazy val KAFKA_LOG_TOPIC_ASYNC_JSON = s"s2graphIn${PHASE}AsyncJson" lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed" + lazy val KAFKA_FAIL_TOPIC_JSON = s"s2graphIn${PHASE}FailedJson" + lazy val KAFKA_MUTATE_FAIL_TOPIC = s"mutateFailed_${PHASE}" // is query or write lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala index 13639b9..b32c16a 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala @@ -22,10 +22,10 @@ package org.apache.s2graph.rest.play.controllers import akka.util.ByteString import org.apache.s2graph.core.GraphExceptions.BadQueryException import org.apache.s2graph.core.PostProcess +import org.apache.s2graph.core.rest.RestHandler.CanLookup import org.apache.s2graph.core.utils.logger import org.apache.s2graph.rest.play.config.Config import play.api.http.HttpEntity -import play.api.libs.iteratee.Enumerator import play.api.libs.json.{JsString, JsValue} import play.api.mvc._ @@ -42,6 +42,10 @@ object ApplicationController extends Controller { val jsonText: BodyParser[String] = s2parse.jsonText + implicit val oneTupleLookup = new CanLookup[Headers] { + override def lookup(m: Headers, key: String) = m.get(key) + } + private def badQueryExceptionResults(ex: Exception) = Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala index 53f3fce..b3ac89d 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala @@ -156,9 +156,11 @@ object CounterController extends Controller { useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit, rateActionId, rateBaseId, rateThreshold) - // prepare exact storage - exactCounter(version).prepare(policy) - if (useRank) { + if (rateAction.isEmpty) { + // prepare exact storage + exactCounter(version).prepare(policy) + } + if (useRank || rateAction.isDefined) { // prepare ranking storage rankingCounter(version).prepare(policy) } @@ -253,8 +255,11 @@ object CounterController extends Controller { // change table name val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_" val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName)) - exactCounter(version).prepare(newPolicy) - if (newPolicy.useRank) { + + if (newPolicy.rateActionId.isEmpty) { + exactCounter(version).prepare(newPolicy) + } + if (newPolicy.useRank || newPolicy.rateActionId.isDefined) { rankingCounter(version).prepare(newPolicy) } Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action"))) @@ -272,8 +277,10 @@ object CounterController extends Controller { policy <- counterModel.findByServiceAction(service, action, useCache = false) } yield { Try { - exactCounter(policy.version).destroy(policy) - if (policy.useRank) { + if (policy.rateActionId.isEmpty) { + exactCounter(policy.version).destroy(policy) + } + if (policy.useRank || policy.rateActionId.isDefined) { rankingCounter(policy.version).destroy(policy) } counterModel.deleteServiceAction(policy) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index da88c3d..8000cf8 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.rest.play.controllers +import com.fasterxml.jackson.databind.JsonMappingException import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.core.rest.RequestParser @@ -30,19 +31,19 @@ import play.api.mvc.{Controller, Result} import scala.collection.Seq import scala.concurrent.Future -import scala.util.Random object EdgeController extends Controller { import ApplicationController._ + import ExceptionHandler._ import play.api.libs.concurrent.Execution.Implicits._ private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler - private def enqueue(topic: String, elem: GraphElement, tsv: String) = { - val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv)) + private def enqueue(topic: String, elem: GraphElement, tsv: String, publishJson: Boolean = false) = { + val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv), publishJson) walLogHandler.enqueue(kafkaMessage) } @@ -50,63 +51,129 @@ object EdgeController extends Controller { val kafkaTopic = toKafkaTopic(graphElem.isAsync) graphElem match { - case v: Vertex => enqueue(kafkaTopic, graphElem, tsv) + case v: Vertex => + enqueue(kafkaTopic, graphElem, tsv) case e: Edge => e.label.extraOptions.get("walLog") match { - case None => enqueue(kafkaTopic, e, tsv) + case None => + enqueue(kafkaTopic, e, tsv) case Some(walLogOpt) => - (walLogOpt \ "method").as[JsValue] match { + (walLogOpt \ "method").get match { case JsString("drop") => // pass case JsString("sample") => val rate = (walLogOpt \ "rate").as[Double] - if (scala.util.Random.nextDouble() < rate) enqueue(kafkaTopic, e, tsv) - case _ => enqueue(kafkaTopic, e, tsv) + if (scala.util.Random.nextDouble() < rate) { + enqueue(kafkaTopic, e, tsv) + } + case _ => + enqueue(kafkaTopic, e, tsv) } } + case _ => logger.error(s"Unknown graph element type: ${graphElem}") } } + private def toDeleteAllFailMessages(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long ) = { + for { + vertex <- srcVertices + id = vertex.id.toString + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") + ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, tsv) + } + } + + private def publishFailTopic(kafkaMessages: Seq[KafkaMessage]): Unit ={ + kafkaMessages.foreach(walLogHandler.enqueue) + } + + def mutateElementsWithFailLog(elements: Seq[(GraphElement, String)]) ={ + val result = s2.mutateElements(elements.map(_._1), true) + result onComplete { results => + results.get.zip(elements).map { + case (false, (e: Edge, tsv: String)) => + val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){ + toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.label), e.labelWithDir.dir, e.ts) + } else{ + Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv))) + } + publishFailTopic(kafkaMessages) + case _ => + } + } + result + } + private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = { if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) else { - try { - elementsWithTsv.foreach { case (graphElem, tsv) => - publish(graphElem, tsv) - } + elementsWithTsv.foreach { case (graphElem, tsv) => + publish(graphElem, tsv) + } - val elementsToStore = for { - (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync) - } yield e - - if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray())) - else { - if (withWait) { - val rets = s2.mutateElements(elementsToStore, withWait) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = elementsToStore.map { element => QueueActor.router ! element; true } - Future.successful(jsonResponse(Json.toJson(rets))) + if (elementsWithTsv.isEmpty) Future.successful(jsonResponse(JsArray())) + else { + val elementWithIdxs = elementsWithTsv.zipWithIndex + if (withWait) { + val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => + !skipElement(element.isAsync) + } + val retToSkip = elementAsync.map(_._2 -> true) + val elementsToStore = elementSync.map(_._1) + val elementsIdxToStore = elementSync.map(_._2) + mutateElementsWithFailLog(elementsToStore).map { rets => + elementsIdxToStore.zip(rets) ++ retToSkip + }.map { rets => + Json.toJson(rets.sortBy(_._1).map(_._2)) + }.map(jsonResponse(_)) + } else { + val rets = elementWithIdxs.map { case ((element, tsv), idx) => + if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv) + true } + Future.successful(jsonResponse(Json.toJson(rets))) } - } catch { - case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) - case e: Throwable => - logger.error(s"tryMutate: ${e.getMessage}", e) - Future.successful(InternalServerError(s"${e.getStackTrace}")) } } } def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { logger.debug(s"$jsValue") - val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation) - tryMutate(edgesWithTsv, withWait) + + try { + val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation) + tryMutate(edgesWithTsv, withWait) + } catch { + case e: JsonMappingException => + logger.malformed(jsValue, e) + Future.successful(BadRequest(s"${e.getMessage}")) + case e: GraphExceptions.JsonParseException => + logger.malformed(jsValue, e) + Future.successful(BadRequest(s"${e.getMessage}")) + case e: Exception => + logger.malformed(jsValue, e) + Future.failed(e) + } } def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = { logger.debug(s"$str") - val elementsWithTsv = requestParser.parseBulkFormat(str) - tryMutate(elementsWithTsv, withWait) + + try { + val elementsWithTsv = requestParser.parseBulkFormat(str) + tryMutate(elementsWithTsv, withWait) + } catch { + case e: JsonMappingException => + logger.malformed(str, e) + Future.successful(BadRequest(s"${e.getMessage}")) + case e: GraphExceptions.JsonParseException => + logger.malformed(str, e) + Future.successful(BadRequest(s"${e.getMessage}")) + case e: Exception => + logger.malformed(str, e) + Future.failed(e) + } } def mutateBulk() = withHeaderAsync(parse.text) { request => @@ -163,29 +230,34 @@ object EdgeController extends Controller { if (edges.isEmpty) Future.successful(jsonResponse(JsArray())) else { + s2.incrementCounts(edges, withWait = true).map { results => val json = results.map { case (isSuccess, resultCount) => Json.obj("success" -> isSuccess, "result" -> resultCount) } + jsonResponse(Json.toJson(json)) } } } def deleteAll() = withHeaderAsync(jsonParser) { request => -// deleteAllInner(request.body, withWait = false) deleteAllInner(request.body, withWait = true) } + def deleteAllWithOutWait() = withHeaderAsync(jsonParser) { request => + deleteAllInner(request.body, withWait = false) + } + def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { - /* logging for delete all request */ + /** logging for delete all request */ def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = { val kafkaMessages = for { id <- ids label <- labels } yield { - val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t") + val tsv = Seq(ts, "deleteAll", "e", RequestParser.jsToStr(id), RequestParser.jsToStr(id), label.label, "{}", direction).mkString("\t") val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) } ExceptionHandler.toKafkaMessage(topic, tsv) @@ -194,10 +266,18 @@ object EdgeController extends Controller { kafkaMessages.foreach(walLogHandler.enqueue) } - def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = { - enqueueLogMessage(ids, labels, ts, direction, None) + def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], + ts: Long, vertices: Seq[Vertex]) = { + val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) if (withWait) { + future onComplete { + case ret => + if (!ret.get) { + val messages = toDeleteAllFailMessages(vertices.toList, labels, GraphUtil.directions(direction), ts) + publishFailTopic(messages) + } + } future } else { Future.successful(true) @@ -205,9 +285,13 @@ object EdgeController extends Controller { } val deleteFutures = jsValue.as[Seq[JsValue]].map { json => - val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json) + val (_labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json) + val srcVertices = vertices + enqueueLogMessage(ids, _labels, ts, direction, None) + val labels = _labels.filterNot(e => skipElement(e.isAsync)) + if (labels.isEmpty || ids.isEmpty) Future.successful(true) - else deleteEach(labels, direction, ids, ts, vertices) + else deleteEach(labels, direction, ids, ts, srcVertices) } val deleteResults = Future.sequence(deleteFutures) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala index 9c4a061..760211a 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala @@ -19,7 +19,6 @@ package org.apache.s2graph.rest.play.controllers -import org.apache.s2graph.core.mysqls.Experiment import org.apache.s2graph.core.rest.RestHandler import play.api.mvc._ @@ -30,10 +29,10 @@ object ExperimentController extends Controller { import ApplicationController._ + def experiments() = experiment("", "", "") def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(jsonText) { request => val body = request.body - - val res = rest.doPost(request.uri, body, request.headers.get(Experiment.impressionKey)) + val res = rest.doPost(request.uri, body, request.headers) res.body.map { case js => val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString) jsonResponse(js, headers: _*) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala index 0260b7a..6a7d4f7 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala @@ -63,12 +63,4 @@ object PublishController extends Controller { def publish(topic: String) = publishOnly(topic) - // def mutateBulk(topic: String) = Action.async(parse.text) { request => - // EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result => - // result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") - // } - // } - def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request => - EdgeController.mutateBulkFormat(request.body) - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala index 495cf7b..1e49ca7 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala @@ -19,8 +19,6 @@ package org.apache.s2graph.rest.play.controllers -import org.apache.s2graph.core.JSONParser -import org.apache.s2graph.core.mysqls.Experiment import org.apache.s2graph.core.rest.RestHandler import play.api.libs.json.Json import play.api.mvc._ @@ -31,13 +29,11 @@ object QueryController extends Controller { import ApplicationController._ import play.api.libs.concurrent.Execution.Implicits.defaultContext - private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest def delegate(request: Request[String]) = { - rest.doPost(request.uri, request.body, request.headers.get(Experiment.impressionKey)).body.map { - js => - jsonResponse(js, "result_size" -> rest.calcSize(js).toString) + rest.doPost(request.uri, request.body, request.headers).body.map { js => + jsonResponse(js, "result_size" -> rest.calcSize(js).toString) } recoverWith ApplicationController.requestFallback(request.body) } @@ -58,13 +54,12 @@ object QueryController extends Controller { def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate) def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = - withHeaderAsync(jsonText) { - request => - val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) - rest.checkEdges(params).body.map { - js => - jsonResponse(js, "result_size" -> rest.calcSize(js).toString) - } recoverWith ApplicationController.requestFallback(request.body) + withHeaderAsync(jsonText) { request => + val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) + rest.checkEdges(params).body.map { + js => + jsonResponse(js, "result_size" -> rest.calcSize(js).toString) + } recoverWith ApplicationController.requestFallback(request.body) } def getVertices() = withHeaderAsync(jsonText)(delegate) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala index 0fdbe43..72e6e82 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala @@ -50,14 +50,16 @@ object VertexController extends Controller { } //FIXME: - val verticesToStore = vertices.filterNot(v => v.isAsync) - - if (withWait) { - val rets = s2.mutateVertices(verticesToStore, withWait = true) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true } - Future.successful(jsonResponse(Json.toJson(rets))) + val verticesToStore = vertices.filterNot(v => skipElement(v.isAsync)) + if (verticesToStore.isEmpty) Future.successful(jsonResponse(Json.toJson(Seq.empty[Boolean]))) + else { + if (withWait) { + val rets = s2.mutateVertices(verticesToStore, withWait = true) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } } } catch { case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e")) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/reference.conf ---------------------------------------------------------------------- diff --git a/s2rest_play/conf/reference.conf b/s2rest_play/conf/reference.conf index bda503c..c3b716b 100644 --- a/s2rest_play/conf/reference.conf +++ b/s2rest_play/conf/reference.conf @@ -125,6 +125,7 @@ local.queue.actor.rate.limit=1000000 # local retry number max.retry.number=100 max.back.off=50 +back.off.timeout=1000 delete.all.fetch.size=10000 hbase.fail.prob=-1.0 lock.expire.time=600000 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2rest_play/conf/routes ---------------------------------------------------------------------- diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes index ee49c69..c360e6f 100644 --- a/s2rest_play/conf/routes +++ b/s2rest_play/conf/routes @@ -23,7 +23,7 @@ # publish -POST /publish/:topic org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic) +#POST /publish/:topic org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic) POST /publishOnly/:topic org.apache.s2graph.rest.play.controllers.PublishController.publishOnly(topic) #### Health Check @@ -37,6 +37,7 @@ POST /graphs/edges/insertBulk org.ap POST /graphs/edges/delete org.apache.s2graph.rest.play.controllers.EdgeController.deletes() POST /graphs/edges/deleteWithWait org.apache.s2graph.rest.play.controllers.EdgeController.deletesWithWait() POST /graphs/edges/deleteAll org.apache.s2graph.rest.play.controllers.EdgeController.deleteAll() +POST /graphs/edges/deleteAllWithOutWait org.apache.s2graph.rest.play.controllers.EdgeController.deleteAllWithOutWait() POST /graphs/edges/update org.apache.s2graph.rest.play.controllers.EdgeController.updates() POST /graphs/edges/updateWithWait org.apache.s2graph.rest.play.controllers.EdgeController.updatesWithWait() POST /graphs/edges/increment org.apache.s2graph.rest.play.controllers.EdgeController.increments() @@ -81,7 +82,7 @@ GET /graphs/getLabels/:serviceName org.ap POST /graphs/createLabel org.apache.s2graph.rest.play.controllers.AdminController.createLabel() POST /graphs/addIndex org.apache.s2graph.rest.play.controllers.AdminController.addIndex() GET /graphs/getLabel/:labelName org.apache.s2graph.rest.play.controllers.AdminController.getLabel(labelName) -PUT /graphs/deleteLabel/:labelName org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName) +PUT /graphs/deleteLabelReally/:labelName org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName) POST /graphs/addProp/:labelName org.apache.s2graph.rest.play.controllers.AdminController.addProp(labelName) POST /graphs/createServiceColumn org.apache.s2graph.rest.play.controllers.AdminController.createServiceColumn() @@ -117,7 +118,7 @@ POST /counter/v1/mget org.apac # Experiment API POST /graphs/experiment/:accessToken/:experimentName/:uuid org.apache.s2graph.rest.play.controllers.ExperimentController.experiment(accessToken, experimentName, uuid) - +POST /graphs/experiments org.apache.s2graph.rest.play.controllers.ExperimentController.experiments() # Map static resources from the /public folder to the /assets URL path GET /images/*file controllers.Assets.at(path="/public/images", file)
