Repository: incubator-s2graph Updated Branches: refs/heads/master 36d5485bd -> e207f676f
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala new file mode 100644 index 0000000..17d9e8f --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala @@ -0,0 +1,82 @@ +package com.kakao.s2graph.core.utils + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.CacheBuilder +import com.typesafe.config.Config + +import scala.concurrent.{Promise, Future, ExecutionContext} + + +class FutureCache[R](config: Config)(implicit ex: ExecutionContext) { + + type Value = (Long, Future[R]) + + private val maxSize = config.getInt("future.cache.max.size") + private val expireAfterWrite = config.getInt("future.cache.expire.after.write") + private val expireAfterAccess = config.getInt("future.cache.expire.after.access") + + private val futureCache = CacheBuilder.newBuilder() + .initialCapacity(maxSize) + .concurrencyLevel(Runtime.getRuntime.availableProcessors()) + .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) + .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]() + + + def asMap() = futureCache.asMap() + + def getIfPresent(cacheKey: Long): Value = { + val (cachedAt, promise) = futureCache.getIfPresent(cacheKey) + (cachedAt, promise.future) + } + + private def checkAndExpire(cacheKey: Long, + cachedAt: Long, + cacheTTL: Long, + oldFuture: Future[R])(op: => Future[R]): Future[R] = { + if (System.currentTimeMillis() >= cachedAt + cacheTTL) { + // future is too old. so need to expire and fetch new data from storage. + futureCache.asMap().remove(cacheKey) + + val newPromise = Promise[R] + val now = System.currentTimeMillis() + + futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { + case null => + // only one thread succeed to come here concurrently + // initiate fetch to storage then add callback on complete to finish promise. + op.onSuccess { case value => + newPromise.success(value) + value + } + newPromise.future + case (cachedAt, oldPromise) => oldPromise.future + } + } else { + // future is not to old so reuse it. + oldFuture + } + } + def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): Future[R] = { + val cacheVal = futureCache.getIfPresent(cacheKey) + cacheVal match { + case null => + val promise = Promise[R] + val now = System.currentTimeMillis() + val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { + case null => + op.onSuccess { case value => + promise.success(value) + value + } + (now, promise) + case oldVal => oldVal + } + checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) + + case (cachedAt, cachedPromise) => + checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala index 438b97d..1c09778 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala @@ -173,7 +173,7 @@ class CrudTest extends IntegrateCommon { TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props) }) - TestUtil.mutateEdgesSync(bulkEdges: _*) + TestUtil.insertEdgesSync(bulkEdges: _*) for { label <- Label.findByName(labelName) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala index 751a7a7..ae9e514 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala @@ -2,7 +2,7 @@ package com.kakao.s2graph.core.Integrate import com.kakao.s2graph.core._ import com.kakao.s2graph.core.mysqls.Label -import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.rest.{RequestParser, RestHandler} import com.kakao.s2graph.core.utils.logger import com.typesafe.config._ import org.scalatest._ @@ -33,8 +33,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { } /** - * Make Service, Label, Vertex for integrate test - */ + * Make Service, Label, Vertex for integrate test + */ def initTestData() = { println("[init start]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") Management.deleteService(testServiceName) @@ -82,8 +82,8 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { /** - * Test Helpers - */ + * Test Helpers + */ object TestUtil { implicit def ec = scala.concurrent.ExecutionContext.global @@ -111,22 +111,18 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { def getEdgesSync(queryJson: JsValue): JsValue = { logger.info(Json.prettyPrint(queryJson)) - - val ret = graph.getEdges(parser.toQuery(queryJson)) - val result = Await.result(ret, HttpRequestWaitingTime) - val jsResult = PostProcess.toSimpleVertexArrJson(result) - - jsResult + val restHandler = new RestHandler(graph) + Await.result(restHandler.getEdgesAsync(queryJson)(PostProcess.toSimpleVertexArrJson), HttpRequestWaitingTime) } - def mutateEdgesSync(bulkEdges: String*) = { + def insertEdgesSync(bulkEdges: String*) = { val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) val jsResult = Await.result(req, HttpRequestWaitingTime) jsResult } - def mutateEdgesAsync(bulkEdges: String*) = { + def insertEdgesAsync(bulkEdges: String*) = { val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) req } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala index 0d2d82e..0b26608 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/QueryTest.scala @@ -1,14 +1,17 @@ package com.kakao.s2graph.core.Integrate +import com.kakao.s2graph.core.GraphExceptions.BadQueryException +import com.kakao.s2graph.core.utils.logger import org.scalatest.BeforeAndAfterEach -import play.api.libs.json._ +import play.api.libs.json.{JsNull, JsNumber, JsValue, Json} + +import scala.util.{Success, Try} class QueryTest extends IntegrateCommon with BeforeAndAfterEach { import TestUtil._ val insert = "insert" - val delete = "delete" val e = "e" val weight = "weight" val is_hidden = "is_hidden" @@ -132,7 +135,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { val result = getEdgesSync(queryGroupBy(0, Seq("weight"))) (result \ "size").as[Int] should be(2) - val weights = (result \\ "groupBy").map { js => + val weights = (result \ "results" \\ "groupBy").map { js => (js \ "weight").as[Int] } weights should contain(30) @@ -163,11 +166,10 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { (result \ "results").as[List[JsValue]].size should be(2) result = getEdgesSync(queryTransform(0, "[[\"weight\"]]")) - (result \\ "to").map(_.toString).sorted should be((result \\ "weight").map(_.toString).sorted) + (result \ "results" \\ "to").map(_.toString).sorted should be((result \ "results" \\ "weight").map(_.toString).sorted) result = getEdgesSync(queryTransform(0, "[[\"_from\"]]")) - val results = (result \ "results").as[JsValue] - (result \\ "to").map(_.toString).sorted should be((results \\ "from").map(_.toString).sorted) + (result \ "results" \\ "to").map(_.toString).sorted should be((result \ "results" \\ "from").map(_.toString).sorted) } test("index") { @@ -217,6 +219,57 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { // } // } + + + test("duration") { + def queryDuration(ids: Seq[Int], from: Int, to: Int) = { + val $from = Json.arr( + Json.obj("serviceName" -> testServiceName, + "columnName" -> testColumnName, + "ids" -> ids)) + + val $step = Json.arr(Json.obj( + "label" -> testLabelName, "direction" -> "out", "offset" -> 0, "limit" -> 100, + "duration" -> Json.obj("from" -> from, "to" -> to))) + + val $steps = Json.arr(Json.obj("step" -> $step)) + + Json.obj("srcVertices" -> $from, "steps" -> $steps) + } + + // get all + var result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) + (result \ "results").as[List[JsValue]].size should be(4) + // inclusive, exclusive + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) + (result \ "results").as[List[JsValue]].size should be(3) + + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) + (result \ "results").as[List[JsValue]].size should be(1) + + val bulkEdges = Seq( + toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), + toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), + toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)), + toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) + ) + insertEdgesSync(bulkEdges: _*) + + // duration test after udpate + // get all + result = getEdgesSync(queryDuration(Seq(0, 2), from = 0, to = 5000)) + (result \ "results").as[List[JsValue]].size should be(4) + + // inclusive, exclusive + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 4000)) + (result \ "results").as[List[JsValue]].size should be(3) + + result = getEdgesSync(queryDuration(Seq(0, 2), from = 1000, to = 2000)) + (result \ "results").as[List[JsValue]].size should be(1) + + } + + test("return tree") { def queryParents(id: Long) = Json.parse( s""" @@ -246,7 +299,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { val src = 100 val tgt = 200 - mutateEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) + insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName)) val result = TestUtil.getEdgesSync(queryParents(src)) val parents = (result \ "results").as[Seq[JsValue]] @@ -259,54 +312,55 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { - test("pagination and _to") { - def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "${testServiceName}", - "columnName": "${testColumnName}", - "id": ${id} - }], - "steps": [ - [ { - "label": "${testLabelName}", - "direction": "out", - "offset": $offset, - "limit": $limit, - "_to": $to - } - ]] - } - """) - - val src = System.currentTimeMillis().toInt - - val bulkEdges = Seq( - toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), - toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), - toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), - toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) - ) - mutateEdgesSync(bulkEdges: _*) - - var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) - var edges = (result \ "results").as[List[JsValue]] - - edges.size should be(2) - (edges(0) \ "to").as[Long] should be(4) - (edges(1) \ "to").as[Long] should be(3) - - result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) - - edges = (result \ "results").as[List[JsValue]] - edges.size should be(2) - (edges(0) \ "to").as[Long] should be(3) - (edges(1) \ "to").as[Long] should be(2) +// test("pagination and _to") { +// def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse( +// s""" +// { "srcVertices": [ +// { "serviceName": "${testServiceName}", +// "columnName": "${testColumnName}", +// "id": ${id} +// }], +// "steps": [ +// [ { +// "label": "${testLabelName}", +// "direction": "out", +// "offset": $offset, +// "limit": $limit, +// "_to": $to +// } +// ]] +// } +// """) +// +// val src = System.currentTimeMillis().toInt +// +// val bulkEdges = Seq( +// toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)), +// toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)), +// toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)), +// toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40)) +// ) +// insertEdgesSync(bulkEdges: _*) +// +// var result = getEdgesSync(querySingle(src, offset = 0, limit = 2)) +// var edges = (result \ "results").as[List[JsValue]] +// +// edges.size should be(2) +// (edges(0) \ "to").as[Long] should be(4) +// (edges(1) \ "to").as[Long] should be(3) +// +// result = getEdgesSync(querySingle(src, offset = 1, limit = 2)) +// +// edges = (result \ "results").as[List[JsValue]] +// edges.size should be(2) +// (edges(0) \ "to").as[Long] should be(3) +// (edges(1) \ "to").as[Long] should be(2) +// +// result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) +// edges = (result \ "results").as[List[JsValue]] +// edges.size should be(1) +// } - result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1)) - edges = (result \ "results").as[List[JsValue]] - edges.size should be(1) - } test("order by") { def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj( "srcVertices" -> Json.arr( @@ -351,7 +405,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40)) ) - mutateEdgesSync(bulkEdges: _*) + insertEdgesSync(bulkEdges: _*) // get edges val edges = getEdgesSync(queryScore(0, Map("weight" -> 1))) @@ -368,39 +422,6 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { edgesTo.reverse should be(ascOrderByTo) } - test("query with '_to' option after delete") { - val from = 90210 - val to = 90211 - val inserts = Seq(toEdge(1, insert, e, from, to, testLabelName)) - mutateEdgesSync(inserts: _*) - - val deletes = Seq(toEdge(2, delete, e, from, to, testLabelName)) - mutateEdgesSync(deletes: _*) - - def queryWithTo = Json.parse( - s""" - { "srcVertices": [ - { "serviceName": "$testServiceName", - "columnName": "$testColumnName", - "id": $from - }], - "steps": [ - { - "step": [{ - "label": "$testLabelName", - "direction": "out", - "offset": 0, - "limit": 10, - "_to": $to - }] - } - ] - } - """) - val result = getEdgesSync(queryWithTo) - (result \ "results").as[List[JsValue]].size should be(0) - - } test("query with sampling") { def queryWithSampling(id: Int, sample: Int) = Json.parse( @@ -502,32 +523,324 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { toEdge(ts, insert, e, 322, 3322, testLabelName) ) - mutateEdgesSync(bulkEdges: _*) + insertEdgesSync(bulkEdges: _*) - var result = getEdgesSync(queryWithSampling(testId, sampleSize)) - println(Json.toJson(result)) - (result \ "results").as[List[JsValue]].size should be(scala.math.min(sampleSize, bulkEdges.size)) + val result1 = getEdgesSync(queryWithSampling(testId, sampleSize)) + (result1 \ "results").as[List[JsValue]].size should be(math.min(sampleSize, bulkEdges.size)) - result = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize)) - println(Json.toJson(result)) - (result \ "results").as[List[JsValue]].size should be(scala.math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) + val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize)) + (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size)) - result = getEdgesSync(twoQueryWithSampling(testId, sampleSize)) - println(Json.toJson(result)) - (result \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3 + val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize)) + (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3 + } + test("test query with filterOut query") { + def queryWithFilterOut(id1: String, id2: String) = Json.parse( + s"""{ + | "limit": 10, + | "filterOut": { + | "srcVertices": [{ + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | }], + | "steps": [{ + | "step": [{ + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | }] + | }] + | }, + | "srcVertices": [{ + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id2 + | }], + | "steps": [{ + | "step": [{ + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | }] + | }] + |} + """.stripMargin + ) + + val testId1 = "-23" + val testId2 = "-25" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), + toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), + toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), + toEdge(4, insert, e, testId2, 111, testLabelName, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 333, testLabelName, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 555, testLabelName, Json.obj(weight -> 1)) + ) + logger.debug(s"${bulkEdges.mkString("\n")}") + insertEdgesSync(bulkEdges: _*) + + val rs = getEdgesSync(queryWithFilterOut(testId1, testId2)) + logger.debug(Json.prettyPrint(rs)) + val results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + (results(0) \ "to").toString should be("555") + } - result = getEdgesSync(queryWithSampling(testId, 0)) - println(Json.toJson(result)) - (result \ "results").as[List[JsValue]].size should be(0) // edges in testLabelName2 = 3 - result = getEdgesSync(queryWithSampling(testId, 10)) - println(Json.toJson(result)) - (result \ "results").as[List[JsValue]].size should be(3) // edges in testLabelName2 = 3 + /** note that this merge two different label result into one */ + test("weighted union") { + def queryWithWeightedUnion(id1: String, id2: String) = Json.parse( + s""" + |{ + | "limit": 10, + | "weights": [ + | 10, + | 1 + | ], + | "groupBy": ["weight"], + | "queries": [ + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | }, + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id2 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | } + | ] + |} + """.stripMargin + ) - result = getEdgesSync(queryWithSampling(testId, -1)) - println(Json.toJson(result)) - (result \ "results").as[List[JsValue]].size should be(3) // edges in testLabelName2 = 3 + val testId1 = "1" + val testId2 = "2" + val bulkEdges = Seq( + toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), + toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), + toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), + toEdge(4, insert, e, testId2, 444, testLabelName2, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 666, testLabelName2, Json.obj(weight -> 1)) + ) + + insertEdgesSync(bulkEdges: _*) + + val rs = getEdgesSync(queryWithWeightedUnion(testId1, testId2)) + logger.debug(Json.prettyPrint(rs)) + val results = (rs \ "results").as[List[JsValue]] + results.size should be(2) + (results(0) \ "scoreSum").as[Float] should be(30.0) + (results(0) \ "agg").as[List[JsValue]].size should be(3) + (results(1) \ "scoreSum").as[Float] should be(3.0) + (results(1) \ "agg").as[List[JsValue]].size should be(3) + } + + test("weighted union with options") { + def queryWithWeightedUnionWithOptions(id1: String, id2: String) = Json.parse( + s""" + |{ + | "limit": 10, + | "weights": [ + | 10, + | 1 + | ], + | "groupBy": ["to"], + | "select": ["to", "weight"], + | "filterOut": { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | } + | ] + | } + | ] + | }, + | "queries": [ + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id1 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | }, + | { + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id2 + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 5 + | } + | ] + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + val testId1 = "-192848" + val testId2 = "-193849" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId1, 111, testLabelName, Json.obj(weight -> 10)), + toEdge(2, insert, e, testId1, 222, testLabelName, Json.obj(weight -> 10)), + toEdge(3, insert, e, testId1, 333, testLabelName, Json.obj(weight -> 10)), + toEdge(4, insert, e, testId2, 111, testLabelName2, Json.obj(weight -> 1)), + toEdge(5, insert, e, testId2, 333, testLabelName2, Json.obj(weight -> 1)), + toEdge(6, insert, e, testId2, 555, testLabelName2, Json.obj(weight -> 1)) + ) + + insertEdgesSync(bulkEdges: _*) + + val rs = getEdgesSync(queryWithWeightedUnionWithOptions(testId1, testId2)) + logger.debug(Json.prettyPrint(rs)) + val results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + + } + + test("scoreThreshold") { + def queryWithScoreThreshold(id: String, scoreThreshold: Int) = Json.parse( + s"""{ + | "limit": 10, + | "scoreThreshold": $scoreThreshold, + | "groupBy": ["to"], + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | } + | ] + | }, + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10 + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + val testId = "-23903" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 101, 102, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 101, 103, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 101, 104, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 102, 103, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 102, 104, testLabelName, Json.obj(weight -> 10)), + toEdge(1, insert, e, 103, 105, testLabelName, Json.obj(weight -> 10)) + ) + // expected: 104 -> 2, 103 -> 2, 102 -> 1,, 105 -> 1 + insertEdgesSync(bulkEdges: _*) + + var rs = getEdgesSync(queryWithScoreThreshold(testId, 2)) + logger.debug(Json.prettyPrint(rs)) + var results = (rs \ "results").as[List[JsValue]] + results.size should be(2) + + rs = getEdgesSync(queryWithScoreThreshold(testId, 1)) + logger.debug(Json.prettyPrint(rs)) + + results = (rs \ "results").as[List[JsValue]] + results.size should be(4) } def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( @@ -548,6 +861,23 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { } """) + def queryGlobalLimit(id: Int, limit: Int): JsValue = Json.obj( + "limit" -> limit, + "srcVertices" -> Json.arr( + Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id) + ), + "steps" -> Json.arr( + Json.obj( + "step" -> Json.arr( + Json.obj( + "label" -> testLabelName + ) + ) + ) + ) + ) + + // called by each test, each override def beforeEach = initTestData() @@ -555,7 +885,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { override def initTestData(): Unit = { super.initTestData() - mutateEdgesSync( + insertEdgesSync( toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, is_hidden -> true)), toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, is_hidden -> false)), toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala index bdcdb9e..aae108e 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/StrongLabelDeleteTest.scala @@ -14,7 +14,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { import TestUtil._ test("Strong consistency select") { - mutateEdgesSync(bulkEdges(): _*) + insertEdgesSync(bulkEdges(): _*) var result = getEdgesSync(query(0)) (result \ "results").as[List[JsValue]].size should be(2) @@ -56,7 +56,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { println(result) (result \ "results").as[List[JsValue]].size should be(0) - mutateEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName)) println(result) @@ -69,7 +69,8 @@ class StrongLabelDeleteTest extends IntegrateCommon { val ret = for { i <- 0 until testNum } yield { - val src = System.currentTimeMillis() + val src = (i + 1) * 10000 +// val src = System.currentTimeMillis() val (ret, last) = testInner(i, src) ret should be(true) @@ -136,7 +137,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { val allRequests = Random.shuffle(insertRequests ++ deleteRequests) // val allRequests = insertRequests ++ deleteRequests val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => - mutateEdgesAsync(bulkRequests: _*) + insertEdgesAsync(bulkRequests: _*) } Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) @@ -175,7 +176,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { } val allRequests = Random.shuffle(insertRequests ++ deleteRequests) val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests => - mutateEdgesAsync(bulkRequests: _*) + insertEdgesAsync(bulkRequests: _*) } Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) @@ -199,7 +200,7 @@ class StrongLabelDeleteTest extends IntegrateCommon { val labelName = testLabelName2 val maxTgtId = 10 val batchSize = 10 - val testNum = 3 + val testNum = 100 val numOfBatch = 10 def testInner(startTs: Long, src: Long) = { @@ -217,13 +218,13 @@ class StrongLabelDeleteTest extends IntegrateCommon { val op = if (Random.nextDouble() < 0.5) "delete" else "update" lastOps(tgt) = op - Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t") + Seq(currentTs, op, "e", src, tgt, labelName, "{}").mkString("\t") } allRequests.foreach(println(_)) val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequests => - mutateEdgesAsync(bulkRequests: _*) + insertEdgesAsync(bulkRequests: _*) } Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala index 1e2d836..b80d9c7 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -76,7 +76,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { println(result) (result \ "results").as[List[JsValue]].size should be(0) - mutateEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) + insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*) result = getEdgesSync(query(20, "in", testTgtColumnName)) (result \ "results").as[List[JsValue]].size should be(3) @@ -90,7 +90,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { override def initTestData(): Unit = { super.initTestData() - mutateEdgesSync(bulkEdges(): _*) + insertEdgesSync(bulkEdges(): _*) } object WeakLabelDeleteHelper { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala deleted file mode 100644 index e5b3e51..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/ManagementTest.scala +++ /dev/null @@ -1,121 +0,0 @@ -//package com.kakao.s2graph.core -//import com.kakao.s2graph.core.Graph -//import com.kakao.s2graph.core.models.{HLabel, HService, HServiceColumn, HBaseModel} -//import com.typesafe.config.ConfigFactory -//import org.scalatest.{FunSuite, Matchers} -//import play.api.libs.json.{JsString, JsBoolean, JsNumber, Json} -// -//import scala.concurrent.ExecutionContext -// -///** -// * Created by shon on 5/15/15. -// */ -//class ManagementTest extends FunSuite with Matchers { -// val labelName = "test_label" -// val serviceName = "test_service" -// val columnName = "test_column" -// val columnType = "long" -// val indexProps = Seq("weight" -> JsNumber(5), "is_hidden" -> JsBoolean(true)) -// val props = Seq("is_blocked" -> JsBoolean(true), "category" -> JsString("sports")) -// val consistencyLevel = "weak" -// val hTableName = Some("testHTable") -// val hTableTTL = Some(86000) -// val preSplitSize = 10 -// val zkQuorum = "localhost" -// -// val config = ConfigFactory.parseString(s"hbase.zookeeper.quorum=$zkQuorum") -// Graph(config)(ExecutionContext.Implicits.global) -// HBaseModel(zkQuorum) -// val current = System.currentTimeMillis() -// val serviceNames = (0 until 2).map { i => s"$serviceName-${current + i}" } -// val labelNames = (0 until 2).map { i => s"$labelName-${current + i}" } -// -//// def runTC[T <: HBaseModel](prevSeq: Long, prevSize: Int, prefix: String)(testSize: Int)(createF: String => T)(fetchF: String => Option[T])(deleteF: String => Boolean) = { -//// var lastSeq = prevSeq -//// val createds = collection.mutable.Map.empty[String, T] -//// val names = (0 until testSize) map { i => s"$prefix-${current + i}"} -//// -//// val rets = for { -//// name <- names -//// } yield { -//// val created = createF(name) -//// val testSeq = created.id.get > lastSeq -//// lastSeq = created.id.get -//// createds += (name -> created) -//// val fetched = fetchF(name) -//// fetched.isDefined && created == fetched.get && testSeq -//// } -//// -//// val deletes = for { -//// name <- names -//// } yield { -//// deleteF(name) -//// } -//// -//// (rets ++ deletes).forall(_) -//// } -// test("test create service") { -// -// var prevSeq = Management.getSequence("HService") -// val prevSize = HService.findAllServices().size -// val createds = collection.mutable.Map.empty[String, HService] -// -// val rets = for { -// serviceName <- serviceNames -// } yield { -// val service = Management.createService(serviceName, zkQuorum, hTableName.get, preSplitSize, hTableTTL) -// val testSeq = service.id.get > prevSeq -// prevSeq = service.id.get -// createds += (service.serviceName -> service) -// val other = Management.findService(service.serviceName) -// other.isDefined && service == other.get && testSeq -// } -// -// val deletes = for { -// serviceName <- serviceNames -// } yield { -// Management.deleteService(serviceName) -// } -// (rets ++ deletes).forall(_) -// -// HService.findAllServices().size == prevSize -// } -// test("test create label") { -// val service = Management.createService(serviceName, zkQuorum, hTableName.get, preSplitSize, hTableTTL) -// var prevSeq = Management.getSequence("HLabel") -// val prevSize = HLabel.findAll(useCache = false) -// val createds = collection.mutable.Map.empty[String, HLabel] -// -// val rets = for { -// lName <- labelNames -// } yield { -// val label = Management.createLabel(lName, serviceName, columnName, columnType, -// serviceName, columnName, columnType, -// true, serviceName, indexProps, props, -// consistencyLevel, hTableName, hTableTTL -// ) -// val testSeq = label.id.get > prevSeq -// prevSeq = label.id.get -// -// createds += (label.label -> label) -// val other = Management.findLabel(label.label) -// other.isDefined && label == other.get && testSeq -// } -// println(HLabel.findAll(useCache = false)) -// val deletes = for { -// lName <- labelNames -// } yield { -// Management.deleteLabel(lName) -// } -// (rets ++ deletes).forall(_) -// HLabel.findAll(useCache = false).size == prevSize -// } -// test("test update label") { -// HLabel.updateLabel(labelName, Seq("is_blocked" -> JsBoolean(false))) -// for { -// label <- HLabel.findByName(labelName, useCache = false) -// } yield { -// println(label) -// } -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala index b431cc2..febad3d 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala @@ -12,7 +12,7 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { // dummy data for dummy edge initTests() - + import HBaseType.{VERSION1, VERSION2} val ts = System.currentTimeMillis() @@ -167,6 +167,58 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { } } + test("replace reserved") { + val ts = 0 + import com.kakao.s2graph.core.rest.TemplateHelper._ + + calculate(ts, 1, "hour") should be(hour + ts) + calculate(ts, 1, "day") should be(day + ts) + + calculate(ts + 10, 1, "HOUR") should be(hour + ts + 10) + calculate(ts + 10, 1, "DAY") should be(day + ts + 10) + + val body = """{ + "day": ${1day}, + "hour": ${1hour}, + "-day": "${-10 day}", + "-hour": ${-10 hour}, + "now": "${now}" + } + """ + + val parsed = replaceVariable(ts, body) + val json = Json.parse(parsed) + + (json \ "day").as[Long] should be (1 * day + ts) + (json \ "hour").as[Long] should be (1 * hour + ts) + + (json \ "-day").as[Long] should be (-10 * day + ts) + (json \ "-hour").as[Long] should be (-10 * hour + ts) + + (json \ "now").as[Long] should be (ts) + + val otherBody = """{ + "nextday": "${next_day}", + "3dayago": "${next_day - 3 day}", + "nexthour": "${next_hour}" + }""" + + val currentTs = System.currentTimeMillis() + val expectedDayTs = currentTs / day * day + day + val expectedHourTs = currentTs / hour * hour + hour + val threeDayAgo = expectedDayTs - 3 * day + val currentTsLs = (1 until 1000).map(currentTs + _) + + currentTsLs.foreach { ts => + val parsed = replaceVariable(ts, otherBody) + val json = Json.parse(parsed) + + (json \ "nextday").as[Long] should be(expectedDayTs) + (json \ "nexthour").as[Long] should be(expectedHourTs) + (json \ "3dayago").as[Long] should be(threeDayAgo) + } + } + // test("time decay") { // val ts = System.currentTimeMillis() // http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala index 38b5b99..128f2d7 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala @@ -1,53 +1,53 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.Graph -import com.typesafe.config.ConfigFactory - -import org.apache.hadoop.hbase.util.Bytes -import org.hbase.async.GetRequest -import org.scalatest.{FunSuite, Matchers} - -import scala.concurrent.ExecutionContext - -class AsynchbaseQueryBuilderTest extends FunSuite with Matchers { - - val dummyRequests = { - for { - id <- 0 until 1000 - } yield { - new GetRequest("a", Bytes.toBytes(id)) - } - } - - implicit val ec = ExecutionContext.Implicits.global - val config = ConfigFactory.load() - val graph = new Graph(config) - - val qb = new AsynchbaseQueryBuilder(graph.storage.asInstanceOf[AsynchbaseStorage]) - - test("test toCacheKeyBytes") { - val startedAt = System.nanoTime() - - for { - i <- dummyRequests.indices - x = qb.toCacheKeyBytes(dummyRequests(i)) - } { - for { - j <- dummyRequests.indices if i != j - y = qb.toCacheKeyBytes(dummyRequests(j)) - } { - x should not equal y - } - } - - dummyRequests.zip(dummyRequests).foreach { case (x, y) => - val xHash = qb.toCacheKeyBytes(x) - val yHash = qb.toCacheKeyBytes(y) - // println(xHash, yHash) - xHash should be(yHash) - } - val duration = System.nanoTime() - startedAt - - println(s">> bytes: $duration") - } -} +//package com.kakao.s2graph.core.storage.hbase +// +//import com.kakao.s2graph.core.Graph +//import com.typesafe.config.ConfigFactory +// +//import org.apache.hadoop.hbase.util.Bytes +//import org.hbase.async.GetRequest +//import org.scalatest.{FunSuite, Matchers} +// +//import scala.concurrent.ExecutionContext +// +//class AsynchbaseQueryBuilderTest extends FunSuite with Matchers { +// +// val dummyRequests = { +// for { +// id <- 0 until 1000 +// } yield { +// new GetRequest("a", Bytes.toBytes(id)) +// } +// } +// +// implicit val ec = ExecutionContext.Implicits.global +// val config = ConfigFactory.load() +// val graph = new Graph(config) +// +// val qb = new AsynchbaseQueryBuilder(graph.storage.asInstanceOf[AsynchbaseStorage]) +// +// test("test toCacheKeyBytes") { +// val startedAt = System.nanoTime() +// +// for { +// i <- dummyRequests.indices +// x = qb.toCacheKeyBytes(dummyRequests(i)) +// } { +// for { +// j <- dummyRequests.indices if i != j +// y = qb.toCacheKeyBytes(dummyRequests(j)) +// } { +// x should not equal y +// } +// } +// +// dummyRequests.zip(dummyRequests).foreach { case (x, y) => +// val xHash = qb.toCacheKeyBytes(x) +// val yHash = qb.toCacheKeyBytes(y) +// // println(xHash, yHash) +// xHash should be(yHash) +// } +// val duration = System.nanoTime() - startedAt +// +// println(s">> bytes: $duration") +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala index 32e3d0c..a36b55f 100644 --- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala +++ b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala @@ -128,7 +128,7 @@ object CounterFunctions extends Logging with WithKafka { itemRankingRdd.unpersist(false) } } - + def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = { rdd.mapPartitions { part => assert(initialize) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala b/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala new file mode 100644 index 0000000..4970399 --- /dev/null +++ b/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala @@ -0,0 +1,196 @@ +package s2.counter.stream + +import com.kakao.s2graph.core.GraphUtil +import com.kakao.s2graph.core.mysqls.Label +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import play.api.libs.json.Json +import s2.config.{S2ConfigFactory, S2CounterConfig} +import s2.counter.core.CounterFunctions.HashMapAccumulable +import s2.counter.core.TimedQualifier.IntervalUnit +import s2.counter.core._ +import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, RankingStorageGraph} +import s2.helper.CounterAdmin +import s2.models.{Counter, DBModel, DefaultCounterModel} +import s2.spark.HashMapParam + +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} + +/** + * Created by hsleep([email protected]) on 2015. 11. 19.. + */ +class ExactCounterStreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + private val master = "local[2]" + private val appName = "exact_counter_streaming" + private val batchDuration = Seconds(1) + + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + + val admin = new CounterAdmin(S2ConfigFactory.config) + val graphOp = new GraphOperation(S2ConfigFactory.config) + val s2config = new S2CounterConfig(S2ConfigFactory.config) + + val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) + val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) + + val service = "test" + val action = "test_case" + + override def beforeAll(): Unit = { + DBModel.initialize(S2ConfigFactory.config) + + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + ssc = new StreamingContext(conf, batchDuration) + + sc = ssc.sparkContext + + // create test_case label + com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") + if (Label.findByName(action, useCache = false).isEmpty) { + val strJs = + s""" + |{ + | "label": "$action", + | "srcServiceName": "$service", + | "srcColumnName": "src", + | "srcColumnType": "string", + | "tgtServiceName": "$service", + | "tgtColumnName": "$action", + | "tgtColumnType": "string", + | "indices": [ + | ], + | "props": [ + | ] + |} + """.stripMargin + graphOp.createLabel(Json.parse(strJs)) + } + + // action + admin.deleteCounter(service, action).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = true)) + } + + override def afterAll(): Unit = { + admin.deleteCounter(service, action) + if (ssc != null) { + ssc.stop() + } + } + + "ExactCounter" should "update" in { + val policy = DefaultCounterModel.findByServiceAction(service, action).get + val data = + s""" + |1434534565675 $service $action 70362200_94013572857366866 {"is_shared":"false","relationship":"FE"} {"userId":"48255079","userIdType":"profile_id","value":"1"} + |1434534565675 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"48255079","userIdType":"profile_id","value":"1"} + |1434534566220 $service $action 51223360_94013140590929619 {"is_shared":"false","relationship":"FE"} {"userId":"312383","userIdType":"profile_id","value":"1"} + |1434534566508 $service $action 63808459_94013420047377826 {"is_shared":"false","relationship":"FE"} {"userId":"21968241","userIdType":"profile_id","value":"1"} + |1434534566210 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"6062217","userIdType":"profile_id","value":"1"} + |1434534566459 $service $action 49699692_94012186431261763 {"is_shared":"false","relationship":"FE"} {"userId":"67863471","userIdType":"profile_id","value":"1"} + |1434534565681 $service $action 64556827_94012311028641810 {"is_shared":"false","relationship":"FE"} {"userId":"19381218","userIdType":"profile_id","value":"1"} + |1434534565865 $service $action 41814266_94012477588942163 {"is_shared":"false","relationship":"FE"} {"userId":"19268547","userIdType":"profile_id","value":"1"} + |1434534565865 $service $action 66697741_94007840665633458 {"is_shared":"false","relationship":"FE"} {"userId":"19268547","userIdType":"profile_id","value":"1"} + |1434534566142 $service $action 66444074_94012737377133826 {"is_shared":"false","relationship":"FE"} {"userId":"11917195","userIdType":"profile_id","value":"1"} + |1434534566077 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"37709890","userIdType":"profile_id","value":"1"} + |1434534565938 $service $action 40921487_94012905738975266 {"is_shared":"false","relationship":"FE"} {"userId":"59869223","userIdType":"profile_id","value":"1"} + |1434534566033 $service $action 64506628_93994707216829506 {"is_shared":"false","relationship":"FE"} {"userId":"50375575","userIdType":"profile_id","value":"1"} + |1434534566451 $service $action 40748868_94013448321919139 {"is_shared":"false","relationship":"FE"} {"userId":"12249539","userIdType":"profile_id","value":"1"} + |1434534566669 $service $action 64499956_94013227717457106 {"is_shared":"false","relationship":"FE"} {"userId":"25167419","userIdType":"profile_id","value":"1"} + |1434534566669 $service $action 66444074_94012737377133826 {"is_shared":"false","relationship":"FE"} {"userId":"25167419","userIdType":"profile_id","value":"1"} + |1434534566318 $service $action 64774665_94012837889027027 {"is_shared":"true","relationship":"F"} {"userId":"71557816","userIdType":"profile_id","value":"1"} + |1434534566274 $service $action 67075480_94008509166933763 {"is_shared":"false","relationship":"FE"} {"userId":"57931860","userIdType":"profile_id","value":"1"} + |1434534566659 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"19990823","userIdType":"profile_id","value":"1"} + |1434534566250 $service $action 70670053_93719933175630611 {"is_shared":"true","relationship":"F"} {"userId":"68897412","userIdType":"profile_id","value":"1"} + |1434534566402 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"15541439","userIdType":"profile_id","value":"1"} + |1434534566122 $service $action 48890741_94013463616012786 {"is_shared":"false","relationship":"FE"} {"userId":"48040409","userIdType":"profile_id","value":"1"} + |1434534566055 $service $action 64509008_94002318232678546 {"is_shared":"true","relationship":"F"} {"userId":"46532039","userIdType":"profile_id","value":"1"} + |1434534565994 $service $action 66644368_94009163363033795 {"is_shared":"false","relationship":"FE"} {"userId":"4143147","userIdType":"profile_id","value":"1"} + |1434534566448 $service $action 64587644_93938555963733954 {"is_shared":"false","relationship":"FE"} {"userId":"689042","userIdType":"profile_id","value":"1"} + |1434534565935 $service $action 52812511_94012009551561315 {"is_shared":"false","relationship":"FE"} {"userId":"35509692","userIdType":"profile_id","value":"1"} + |1434534566544 $service $action 70452048_94008573197583762 {"is_shared":"false","relationship":"FE"} {"userId":"5172421","userIdType":"profile_id","value":"1"} + |1434534565929 $service $action 54547023_94013384964278435 {"is_shared":"false","relationship":"FE"} {"userId":"33556498","userIdType":"profile_id","value":"1"} + |1434534566358 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"8987346","userIdType":"profile_id","value":"1"} + |1434534566057 $service $action 67075480_94008509166933763 {"is_shared":"false","relationship":"FE"} {"userId":"35134964","userIdType":"profile_id","value":"1"} + |1434534566140 $service $action 54547023_94013384964278435 {"is_shared":"false","relationship":"FE"} {"userId":"11900315","userIdType":"profile_id","value":"1"} + |1434534566158 $service $action 64639374_93888330176053635 {"is_shared":"true","relationship":"F"} {"userId":"49996643","userIdType":"profile_id","value":"1"} + |1434534566025 $service $action 67265128_94009084771192002 {"is_shared":"false","relationship":"FE"} {"userId":"37801480","userIdType":"profile_id","value":"1"} + """.stripMargin.trim + // println(data) + val rdd = sc.parallelize(Seq(("", data))) + + // rdd.foreachPartition { part => + // part.foreach(println) + // } + val resultRdd = CounterFunctions.makeExactRdd(rdd, 2) + val result = resultRdd.collect().toMap + + // result.foreachPartition { part => + // part.foreach(println) + // } + + val parsed = { + for { + line <- GraphUtil.parseString(data) + item <- CounterEtlItem(line).toSeq + ev <- CounterFunctions.exactMapper(item).toSeq + } yield { + ev + } + } + val parsedResult = parsed.groupBy(_._1).mapValues(values => values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + _, 0L))) + + // parsedResult.foreach { case (k, v) => + // println(k, v) + // } + + result should not be empty + result should equal (parsedResult) + + val itemId = "46889329_94013502934177075" + val key = ExactKey(DefaultCounterModel.findByServiceAction(service, action).get, itemId, checkItemType = true) + val value = result.get(key) + + value should not be empty + value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, String])) should equal (Some(6L)) + + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map.empty[String, Set[String]]) should be (None) + + val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + resultRdd.foreachPartition { part => + CounterFunctions.updateExactCounter(part.toSeq, acc) + } + + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map.empty[String, String]) -> Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map.empty[String, Set[String]]) should be (Some(expected)) + } + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("is_shared" -> Set("false"))) should be (Some(expected)) + } + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("relationship" -> Set("FE"))) should be (Some(expected)) + } + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> "FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.relationship.false.FE") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be (Some(expected)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/build.sbt ---------------------------------------------------------------------- diff --git a/s2rest_netty/build.sbt b/s2rest_netty/build.sbt new file mode 100644 index 0000000..666fd80 --- /dev/null +++ b/s2rest_netty/build.sbt @@ -0,0 +1,7 @@ +name := "s2rest_netty" + +enablePlugins(JavaAppPackaging) + +libraryDependencies ++= Seq( + "io.netty" % "netty-all" % "4.0.33.Final" +) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/conf/logger.xml ---------------------------------------------------------------------- diff --git a/s2rest_netty/conf/logger.xml b/s2rest_netty/conf/logger.xml new file mode 100644 index 0000000..2d767c2 --- /dev/null +++ b/s2rest_netty/conf/logger.xml @@ -0,0 +1,83 @@ +<configuration> + + <conversionRule conversionWord="coloredLevel" converterClass="play.api.Logger$ColoredLevel"/> + + <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>logs/application.log</file> + + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>logs/application.%i.log</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>9</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>500MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%date [%level] [%logger] [%thread] - %message %xException%n</pattern> + </encoder> + </appender> + + + <appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>logs/error.log</file> + <append>true</append> + <encoder> + <pattern>%date [%level] [%logger] [%thread] - %message %xException%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>logs/error.log.%d.%i</fileNamePattern> + <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> + <maxFileSize>500MB</maxFileSize> + </timeBasedFileNamingAndTriggeringPolicy> + <maxHistory>3</maxHistory> + </rollingPolicy> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%coloredLevel %logger{15} - %message%n%xException%n</pattern> + </encoder> + </appender> + + <appender name="ACTOR" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>logs/actor.log</file> + <append>true</append> + <encoder> + <pattern>%date [%level] [%logger] [%thread] - %message %xException%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>logs/actor.log.%d.%i</fileNamePattern> + <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> + <maxFileSize>200MB</maxFileSize> + </timeBasedFileNamingAndTriggeringPolicy> + <maxHistory>7</maxHistory> + </rollingPolicy> + </appender> + + <logger name="play" level="INFO"> + <appender-ref ref="STDOUT"/> + <appender-ref ref="FILE"/> + </logger> + + <logger name="application" level="INFO"> + <appender-ref ref="STDOUT"/> + <appender-ref ref="FILE"/> + </logger> + + <logger name="error" level="INFO"> + <appender-ref ref="STDOUT"/> + <appender-ref ref="ERROR"/> + </logger> + + <logger name="actor" level="INFO"> + <appender-ref ref="ACTOR"/> + </logger> + + <logger name="akka" level="INFO"> + <appender-ref ref="STDOUT"/> + <appender-ref ref="FILE"/> + </logger> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/conf/reference.conf ---------------------------------------------------------------------- diff --git a/s2rest_netty/conf/reference.conf b/s2rest_netty/conf/reference.conf new file mode 100644 index 0000000..a992e5d --- /dev/null +++ b/s2rest_netty/conf/reference.conf @@ -0,0 +1,131 @@ +# This is the main configuration file for the application. +# ~~~~~ + +# Secret key +# ~~~~~ +# The secret key is used to secure cryptographics functions. +# +# This must be changed for production, but we recommend not changing it in this file. +# +# See http://www.playframework.com/documentation/latest/ApplicationSecret for more details. +application.secret="/`==g^yr2DNnZGK_L^rguLZeR`60uLOVgY@OhyTv:maatl:Tl>9or/d1xME3b/Pi" + +# The application languages +# ~~~~~ +application.langs="en" + +# Global object class +# ~~~~~ +# Define the Global object class for this application. +# Default to Global in the root package. +# application.global=Global + +# Router +# ~~~~~ +# Define the Router object to use for this application. +# This router will be looked up first when the application is starting up, +# so make sure this is the entry point. +# Furthermore, it's assumed your route file is named properly. +# So for an application router like `my.application.Router`, +# you may need to define a router file `conf/my.application.routes`. +# Default to Routes in the root package (and conf/routes) +# application.router=my.application.Routes + +# Database configuration +# ~~~~~ +# You can declare as many datasources as you want. +# By convention, the default datasource is named `default` +# +# db.default.driver=org.h2.Driver +# db.default.url="jdbc:h2:mem:play" +# db.default.user=sa +# db.default.password="" + +# Evolutions +# ~~~~~ +# You can disable evolutions if needed +# evolutionplugin=disabled + +# Logger +# ~~~~~ +# You can also configure logback (http://logback.qos.ch/), +# by providing an application-logger.xml file in the conf directory. + +# Root logger: +logger.root=ERROR + +# Logger used by the framework: +logger.play=INFO + +# Logger provided to your application: +logger.application=DEBUG + +# APP PHASE +phase=dev +host=localhost + +# DB +s2graph.models.table.name="models-dev" +hbase.zookeeper.quorum=${host} +db.default.url="jdbc:mysql://"${host}":3306/graph_dev" +# Query server +is.query.server=true +is.write.server=true +query.hard.limit=100000 + +# Local Cache +cache.ttl.seconds=60 +cache.max.size=100000 + +# HBASE +#hbase.client.operation.timeout=1000 +#async.hbase.client.flush.interval=100 +hbase.table.compression.algorithm="gz" + +# Asynchbase +hbase.client.retries.number=100 +hbase.rpcs.buffered_flush_interval=100 +hbase.rpc.timeout=0 +#hbase.nsre.high_watermark=1000000 +#hbase.timer.tick=5 +#hbase.timer.ticks_per_wheel=5 + +# Kafka +kafka.metadata.broker.list=${host} +kafka.producer.pool.size=0 + +# HTTP +parsers.text.maxLength=512K +parsers.json.maxLength=512K +trustxforwarded=false + +# Local Queue Actor +local.queue.actor.max.queue.size=100000 +local.queue.actor.rate.limit=1000000 + +# local retry number +max.retry.number=100 +max.back.off=50 +delete.all.fetch.size=10000 +hbase.fail.prob=-1.0 + +# max allowd edges for deleteAll is multiply of above two configuration. + +# set global obejct package, TODO: remove global +application.global=com.kakao.s2graph.rest.Global + +akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "DEBUG" +} + + +# Future cache. +future.cache.max.size=100000 +future.cache.expire.after.write=10000 +future.cache.expire.after.access=5000 + + +# Counter +redis.instances = [${host}] + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/resources/application.conf b/s2rest_netty/src/main/resources/application.conf new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/src/main/resources/reference.conf ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/resources/reference.conf b/s2rest_netty/src/main/resources/reference.conf new file mode 100644 index 0000000..a992e5d --- /dev/null +++ b/s2rest_netty/src/main/resources/reference.conf @@ -0,0 +1,131 @@ +# This is the main configuration file for the application. +# ~~~~~ + +# Secret key +# ~~~~~ +# The secret key is used to secure cryptographics functions. +# +# This must be changed for production, but we recommend not changing it in this file. +# +# See http://www.playframework.com/documentation/latest/ApplicationSecret for more details. +application.secret="/`==g^yr2DNnZGK_L^rguLZeR`60uLOVgY@OhyTv:maatl:Tl>9or/d1xME3b/Pi" + +# The application languages +# ~~~~~ +application.langs="en" + +# Global object class +# ~~~~~ +# Define the Global object class for this application. +# Default to Global in the root package. +# application.global=Global + +# Router +# ~~~~~ +# Define the Router object to use for this application. +# This router will be looked up first when the application is starting up, +# so make sure this is the entry point. +# Furthermore, it's assumed your route file is named properly. +# So for an application router like `my.application.Router`, +# you may need to define a router file `conf/my.application.routes`. +# Default to Routes in the root package (and conf/routes) +# application.router=my.application.Routes + +# Database configuration +# ~~~~~ +# You can declare as many datasources as you want. +# By convention, the default datasource is named `default` +# +# db.default.driver=org.h2.Driver +# db.default.url="jdbc:h2:mem:play" +# db.default.user=sa +# db.default.password="" + +# Evolutions +# ~~~~~ +# You can disable evolutions if needed +# evolutionplugin=disabled + +# Logger +# ~~~~~ +# You can also configure logback (http://logback.qos.ch/), +# by providing an application-logger.xml file in the conf directory. + +# Root logger: +logger.root=ERROR + +# Logger used by the framework: +logger.play=INFO + +# Logger provided to your application: +logger.application=DEBUG + +# APP PHASE +phase=dev +host=localhost + +# DB +s2graph.models.table.name="models-dev" +hbase.zookeeper.quorum=${host} +db.default.url="jdbc:mysql://"${host}":3306/graph_dev" +# Query server +is.query.server=true +is.write.server=true +query.hard.limit=100000 + +# Local Cache +cache.ttl.seconds=60 +cache.max.size=100000 + +# HBASE +#hbase.client.operation.timeout=1000 +#async.hbase.client.flush.interval=100 +hbase.table.compression.algorithm="gz" + +# Asynchbase +hbase.client.retries.number=100 +hbase.rpcs.buffered_flush_interval=100 +hbase.rpc.timeout=0 +#hbase.nsre.high_watermark=1000000 +#hbase.timer.tick=5 +#hbase.timer.ticks_per_wheel=5 + +# Kafka +kafka.metadata.broker.list=${host} +kafka.producer.pool.size=0 + +# HTTP +parsers.text.maxLength=512K +parsers.json.maxLength=512K +trustxforwarded=false + +# Local Queue Actor +local.queue.actor.max.queue.size=100000 +local.queue.actor.rate.limit=1000000 + +# local retry number +max.retry.number=100 +max.back.off=50 +delete.all.fetch.size=10000 +hbase.fail.prob=-1.0 + +# max allowd edges for deleteAll is multiply of above two configuration. + +# set global obejct package, TODO: remove global +application.global=com.kakao.s2graph.rest.Global + +akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "DEBUG" +} + + +# Future cache. +future.cache.max.size=100000 +future.cache.expire.after.write=10000 +future.cache.expire.after.access=5000 + + +# Counter +redis.instances = [${host}] + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_netty/src/main/scala/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/Server.scala b/s2rest_netty/src/main/scala/Server.scala index 7290b8a..16477b1 100644 --- a/s2rest_netty/src/main/scala/Server.scala +++ b/s2rest_netty/src/main/scala/Server.scala @@ -1,9 +1,12 @@ package com.kakao.s2graph.rest.netty +import java.util.Map.Entry import java.util.concurrent.Executors +import java.util.function.Consumer import com.kakao.s2graph.core.GraphExceptions.BadQueryException import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls.Experiment import com.kakao.s2graph.core.rest.RestHandler.HandlerResult import com.kakao.s2graph.core.rest._ import com.kakao.s2graph.core.utils.Extensions._ @@ -88,11 +91,11 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends case e: BadQueryException => logger.error(s"{$requestBody}, ${e.getMessage}", e) val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result()) + simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) case e: Exception => logger.error(s"${requestBody}, ${e.getMessage}", e) val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8) - simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result()) + simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) } } } @@ -130,11 +133,10 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends } else badRoute(ctx) case HttpMethod.POST => - val jsonString = req.content.toString(CharsetUtil.UTF_8) - val jsQuery = Json.parse(jsonString) + val body = req.content.toString(CharsetUtil.UTF_8) - val result = s2rest.doPost(uri, jsQuery) - toResponse(ctx, req, jsQuery, result, startedAt) + val result = s2rest.doPost(uri, body, Option(req.headers().get(Experiment.impressionKey))) + toResponse(ctx, req, Json.parse(body), result, startedAt) case _ => simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/app/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala index 8005f79..6ce3ac4 100644 --- a/s2rest_play/app/Bootstrap.scala +++ b/s2rest_play/app/Bootstrap.scala @@ -7,7 +7,7 @@ import com.kakao.s2graph.core.rest._ import com.kakao.s2graph.core.utils.logger import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph} import config.Config -import controllers.{AdminController, ApplicationController} +import controllers.{ApplicationController} import play.api.Application import play.api.mvc.{WithFilters, _} import play.filters.gzip.GzipFilter http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/app/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/EdgeController.scala b/s2rest_play/app/controllers/EdgeController.scala index 2b3b32e..a8e1a41 100644 --- a/s2rest_play/app/controllers/EdgeController.scala +++ b/s2rest_play/app/controllers/EdgeController.scala @@ -1,11 +1,9 @@ package controllers import actors.QueueActor -import com.kakao.s2graph.core.GraphExceptions.BadQueryException import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.{LabelMeta, Label} +import com.kakao.s2graph.core.mysqls.{Label} import com.kakao.s2graph.core.rest.RequestParser -import com.kakao.s2graph.core.types.LabelWithDirection import com.kakao.s2graph.core.utils.logger import config.Config import org.apache.kafka.clients.producer.ProducerRecord @@ -14,7 +12,6 @@ import play.api.mvc.{Controller, Result} import scala.collection.Seq import scala.concurrent.Future -import scala.util.{Failure, Success} object EdgeController extends Controller { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/build.sbt ---------------------------------------------------------------------- diff --git a/s2rest_play/build.sbt b/s2rest_play/build.sbt old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala b/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala index bf24ed7..a387ba5 100644 --- a/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala +++ b/s2rest_play/test/benchmark/JsonBenchmarkSpec.scala @@ -4,43 +4,43 @@ import play.api.libs.json.JsNumber import play.api.test.{FakeApplication, PlaySpecification, WithApplication} import play.libs.Json -class JsonBenchmarkSpec extends BenchmarkCommon with PlaySpecification { - "to json" should { - implicit val app = FakeApplication() - - "json benchmark" in new WithApplication(app) { +class JsonBenchmarkSpec extends BenchmarkCommon { + "to json" >> { + "json benchmark" >> { duration("map to json") { - (0 to 100) foreach { n => + (0 to 10) foreach { n => val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap Json.toJson(numberMaps) } } duration("directMakeJson") { - (0 to 100) foreach { n => + (0 to 10) foreach { n => var jsObj = play.api.libs.json.Json.obj() - (0 to 100).foreach { n => + (0 to 10).foreach { n => jsObj += (n.toString -> JsNumber(n * n)) } } } duration("map to json 2") { - (0 to 500) foreach { n => - val numberMaps = (0 to 100).map { n => (n.toString -> JsNumber(n * n)) }.toMap + (0 to 50) foreach { n => + val numberMaps = (0 to 10).map { n => (n.toString -> JsNumber(n * n)) }.toMap Json.toJson(numberMaps) } } duration("directMakeJson 2") { - (0 to 500) foreach { n => + (0 to 50) foreach { n => var jsObj = play.api.libs.json.Json.obj() - (0 to 100).foreach { n => + (0 to 10).foreach { n => jsObj += (n.toString -> JsNumber(n * n)) } } } + true } + true } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala b/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala index d38ff5e..d2d3624 100644 --- a/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala +++ b/s2rest_play/test/benchmark/OrderingUtilBenchmarkSpec.scala @@ -8,10 +8,7 @@ import play.api.{Application => PlayApplication} import scala.util.Random -/** - * Created by hsleep([email protected]) on 2015. 11. 9.. - */ -class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { +class OrderingUtilBenchmarkSpec extends BenchmarkCommon { "OrderingUtilBenchmarkSpec" should { "performance MultiOrdering any" >> { @@ -24,7 +21,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { } val sorted1 = duration("TupleOrdering double,long") { - (0 until 10000) foreach { _ => + (0 until 1000) foreach { _ => tupLs.sortBy { case (x, y) => -x -> -y } @@ -35,7 +32,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { }.map { x => x._1 } val sorted2 = duration("MultiOrdering double,long") { - (0 until 10000) foreach { _ => + (0 until 1000) foreach { _ => seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false))) } seqLs.sorted(new SeqMultiOrdering[Any](Seq(false, false))) @@ -45,7 +42,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { } "performance MultiOrdering double" >> { - val tupLs = (0 until 500) map { i => + val tupLs = (0 until 50) map { i => Random.nextDouble() -> Random.nextDouble() } @@ -54,13 +51,13 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { } duration("MultiOrdering double") { - (0 until 10000) foreach { _ => + (0 until 1000) foreach { _ => seqLs.sorted(new SeqMultiOrdering[Double](Seq(false, false))) } } duration("TupleOrdering double") { - (0 until 10000) foreach { _ => + (0 until 1000) foreach { _ => tupLs.sortBy { case (x, y) => -x -> -y } @@ -71,7 +68,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { } "performance MultiOrdering jsvalue" >> { - val tupLs = (0 until 500) map { i => + val tupLs = (0 until 50) map { i => Random.nextDouble() -> Random.nextLong() } @@ -80,7 +77,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { } val sorted1 = duration("TupleOrdering double,long") { - (0 until 10000) foreach { _ => + (0 until 1000) foreach { _ => tupLs.sortBy { case (x, y) => -x -> -y } @@ -91,7 +88,7 @@ class OrderingUtilBenchmarkSpec extends BenchmarkCommon with PlaySpecification { } val sorted2 = duration("MultiOrdering jsvalue") { - (0 until 10000) foreach { _ => + (0 until 1000) foreach { _ => seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false))) } seqLs.sorted(new SeqMultiOrdering[JsValue](Seq(false, false)))
