Repository: incubator-s2graph Updated Branches: refs/heads/master 0c5df5eff -> e45d69d3c
[S2GRAPH-45]: Provide way to call specific bucket on experiment. pass impressionId from header into s2core so user can specify specific becket under experiment. JIRA: [S2GRAPH-45] https://issues.apache.org/jira/browse/S2GRAPH-45 Pull Request: Closes #29 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e45d69d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e45d69d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e45d69d3 Branch: refs/heads/master Commit: e45d69d3c0396d7cbc13d4bc9193e035cbda8065 Parents: 0c5df5e Author: DO YUNG YOON <[email protected]> Authored: Tue Feb 23 20:08:51 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Feb 23 20:08:51 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../kakao/s2graph/core/mysqls/Experiment.scala | 14 +++--- .../kakao/s2graph/core/rest/RestHandler.scala | 46 ++++++++++---------- .../app/controllers/ApplicationController.scala | 9 ++-- .../app/controllers/ExperimentController.scala | 9 ++-- .../app/controllers/JsonBodyParser.scala | 25 ++++++++--- .../app/controllers/QueryController.scala | 44 ++++++++++--------- 7 files changed, 88 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 602eb0d..b7c2366 100644 --- a/CHANGES +++ b/CHANGES @@ -12,6 +12,8 @@ Release 0.12.1 - unreleased S2GRAPH-32: Support variable such as now, day, hour on query (Committed by DOYUNG YOON). + S2GRAPH-45: Provide way to call specific bucket on experiment (Committed by DOYUNG YOON). + IMPROVEMENT S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala index d484914..88c85b7 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala @@ -56,12 +56,16 @@ case class Experiment(id: Option[Int], } yield range -> bucket - def findBucket(uuid: String): Option[Bucket] = { - val seed = experimentType match { - case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1 - case _ => (Random.nextInt(totalModular)) + 1 + def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = { + impIdOpt match { + case Some(impId) => Bucket.findByImpressionId(impId) + case None => + val seed = experimentType match { + case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1 + case _ => Random.nextInt(totalModular) + 1 + } + findBucket(seed) } - findBucket(seed) } def findBucket(uuidMod: Int): Option[Bucket] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala index 9647314..a9424d0 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala @@ -23,14 +23,15 @@ object RestHandler { class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { import RestHandler._ - - val s2Parser = new RequestParser(graph.config) + val requestParser = new RequestParser(graph.config) /** * Public APIS */ - def doPost(uri: String, jsQuery: JsValue): HandlerResult = { + def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = { try { + val jsQuery = Json.parse(body) + uri match { case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)) @@ -43,7 +44,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery)) case uri if uri.startsWith("/graphs/experiment") => val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3) - experiment(jsQuery, accessToken, experimentName, uuid) + experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt) case _ => throw new RuntimeException("route is not found") } } catch { @@ -54,7 +55,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { // TODO: Refactor to doGet def checkEdges(jsValue: JsValue): HandlerResult = { try { - val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue) + val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue) HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs => val edgeJsons = for { @@ -77,12 +78,13 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { /** * Private APIS */ - private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String): HandlerResult = { + private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = { + try { val bucketOpt = for { service <- Service.findByAccessToken(accessToken) experiment <- Experiment.findBy(service.id.get, experimentName) - bucket <- experiment.findBucket(uuid) + bucket <- experiment.findBucket(uuid, impKeyOpt) } yield bucket val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found")) @@ -99,16 +101,15 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): HandlerResult = { if (bucket.isEmpty) HandlerResult(Future.successful(PostProcess.emptyResults)) else { - val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid) + val body = buildRequestBody(Option(contentsBody), bucket, uuid) val url = new URL(bucket.apiPath) - val path = url.getPath() + val path = url.getPath // dummy log for sampling - val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody" - - logger.info(experimentLog) + val experimentLog = s"POST $path took -1 ms 200 -1 $body" + logger.debug(experimentLog) - doPost(path, jsonBody) + doPost(path, body) } } @@ -132,15 +133,15 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { val fetch = eachQuery(post) _ jsonQuery match { - case JsArray(arr) => Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray) - case obj@JsObject(_) => fetch(s2Parser.toQuery(obj)) + case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray) + case obj@JsObject(_) => fetch(requestParser.toQuery(obj)) case _ => throw BadQueryException("Cannot support") } } private def getEdgesExcludedAsync(jsonQuery: JsValue) (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { - val q = s2Parser.toQuery(jsonQuery) + val q = requestParser.toQuery(jsonQuery) val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) val fetchFuture = graph.getEdges(q) @@ -170,9 +171,13 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } } - - private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = { + private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = { var body = bucket.requestBody.replace("#uuid", uuid) + + // // replace variable + // body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body) + + // replace param for { requestKeyJson <- requestKeyJsonOpt jsObj <- requestKeyJson.asOpt[JsObject] @@ -185,10 +190,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { body = body.replace(key, replacement) } - Try(Json.parse(body)).recover { - case e: Exception => - throw new BadQueryException(s"wrong or missing template parameter: ${e.getMessage.takeWhile(_ != '\n')}") - } get + body } def calcSize(js: JsValue): Int = js match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/ApplicationController.scala b/s2rest_play/app/controllers/ApplicationController.scala index d9384d9..5f54edd 100644 --- a/s2rest_play/app/controllers/ApplicationController.scala +++ b/s2rest_play/app/controllers/ApplicationController.scala @@ -4,7 +4,7 @@ import com.kakao.s2graph.core.GraphExceptions.BadQueryException import com.kakao.s2graph.core.PostProcess import com.kakao.s2graph.core.utils.logger import play.api.libs.iteratee.Enumerator -import play.api.libs.json.{JsString, JsValue} +import play.api.libs.json.{JsString, JsValue, Json} import play.api.mvc._ import scala.concurrent.{ExecutionContext, Future} @@ -17,13 +17,15 @@ object ApplicationController extends Controller { val jsonParser: BodyParser[JsValue] = controllers.s2parse.json + val jsonText: BodyParser[String] = controllers.s2parse.jsonText + private def badQueryExceptionResults(ex: Exception) = Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader)) private def errorResults = Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader)) - def requestFallback(body: JsValue): PartialFunction[Throwable, Future[Result]] = { + def requestFallback(body: String): PartialFunction[Throwable, Future[Result]] = { case e: BadQueryException => logger.error(s"{$body}, ${e.getMessage}", e) badQueryExceptionResults(e) @@ -64,6 +66,7 @@ object ApplicationController extends Controller { case JsString(str) => str case _ => jsValue.toString } + case AnyContentAsEmpty => "" case _ => request.body.toString } @@ -73,8 +76,6 @@ object ApplicationController extends Controller { else s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" - logger.info(s"${request.method} ${request.uri} result_size: $resultSize") - str } finally { /* pass */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/ExperimentController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/ExperimentController.scala b/s2rest_play/app/controllers/ExperimentController.scala index 40f67d1..e48b0f1 100644 --- a/s2rest_play/app/controllers/ExperimentController.scala +++ b/s2rest_play/app/controllers/ExperimentController.scala @@ -1,9 +1,10 @@ package controllers +import com.kakao.s2graph.core.mysqls.Experiment import com.kakao.s2graph.core.rest.RestHandler +import com.kakao.s2graph.core.utils.logger import play.api.mvc._ - import scala.concurrent.ExecutionContext.Implicits.global object ExperimentController extends Controller { @@ -11,10 +12,10 @@ object ExperimentController extends Controller { import ApplicationController._ - def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(parse.anyContent) { request => - val body = request.body.asJson.get - val res = rest.doPost(request.uri, body) + def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(jsonText) { request => + val body = request.body + val res = rest.doPost(request.uri, body, request.headers.get(Experiment.impressionKey)) res.body.map { case js => val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString) jsonResponse(js, headers: _*) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/JsonBodyParser.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/JsonBodyParser.scala b/s2rest_play/app/controllers/JsonBodyParser.scala index 3e3d40c..4339eb4 100644 --- a/s2rest_play/app/controllers/JsonBodyParser.scala +++ b/s2rest_play/app/controllers/JsonBodyParser.scala @@ -2,17 +2,13 @@ package controllers import com.kakao.s2graph.core.utils.logger import play.api.Play -import play.api.libs.iteratee.Iteratee +import play.api.libs.iteratee.{Execution, Iteratee} import play.api.libs.json.{JsValue, Json} import play.api.mvc._ import scala.concurrent.Future import scala.util.control.NonFatal -/** - * Created by hsleep([email protected]) on 15. 9. 1.. - */ - object s2parse extends BodyParsers { import parse._ @@ -20,9 +16,26 @@ object s2parse extends BodyParsers { val defaultMaxTextLength = 1024 * 512 val defaultMaxJsonLength = 1024 * 512 -// def json: BodyParser[JsValue] = json(DEFAULT_MAX_TEXT_LENGTH) def json: BodyParser[JsValue] = json(defaultMaxTextLength) + /** + * parseText with application/json header for Pre-Process text + */ + def jsonText: BodyParser[String] = when( + _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), + jsonText(defaultMaxTextLength), + createBadResult("Expecting text/json or application/json body") + ) + + private def jsonText(maxLength: Int): BodyParser[String] = BodyParser("json, maxLength=" + maxLength) { request => + import play.api.libs.iteratee.Execution.Implicits.trampoline + import play.api.libs.iteratee.Traversable + + Traversable.takeUpTo[Array[Byte]](maxLength) + .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, "UTF-8"))) + .flatMap(Iteratee.eofOrElse(Results.EntityTooLarge)) + } + def json(maxLength: Int): BodyParser[JsValue] = when( _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), tolerantJson(maxLength), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/QueryController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/QueryController.scala b/s2rest_play/app/controllers/QueryController.scala index e836259..d133370 100644 --- a/s2rest_play/app/controllers/QueryController.scala +++ b/s2rest_play/app/controllers/QueryController.scala @@ -1,10 +1,10 @@ package controllers - import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls.Experiment import com.kakao.s2graph.core.rest.RestHandler -import play.api.libs.json.{JsValue, Json} -import play.api.mvc.{Controller, Request} +import play.api.libs.json.{Json} +import play.api.mvc._ import scala.language.postfixOps @@ -15,34 +15,38 @@ object QueryController extends Controller with JSONParser { private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest - def delegate(request: Request[JsValue]) = - rest.doPost(request.uri, request.body).body.map { js => - jsonResponse(js, "result_size" -> rest.calcSize(js).toString) + def delegate(request: Request[String]) = { + rest.doPost(request.uri, request.body, request.headers.get(Experiment.impressionKey)).body.map { + js => + jsonResponse(js, "result_size" -> rest.calcSize(js).toString) } recoverWith ApplicationController.requestFallback(request.body) + } - def getEdges() = withHeaderAsync(jsonParser)(delegate) + def getEdges() = withHeaderAsync(jsonText)(delegate) - def getEdgesWithGrouping() = withHeaderAsync(jsonParser)(delegate) + def getEdgesWithGrouping() = withHeaderAsync(jsonText)(delegate) - def getEdgesExcluded() = withHeaderAsync(jsonParser)(delegate) + def getEdgesExcluded() = withHeaderAsync(jsonText)(delegate) - def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser)(delegate) + def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonText)(delegate) - def checkEdges() = withHeaderAsync(jsonParser)(delegate) + def checkEdges() = withHeaderAsync(jsonText)(delegate) - def getEdgesGrouped() = withHeaderAsync(jsonParser)(delegate) + def getEdgesGrouped() = withHeaderAsync(jsonText)(delegate) - def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser)(delegate) + def getEdgesGroupedExcluded() = withHeaderAsync(jsonText)(delegate) - def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonParser)(delegate) + def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate) def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = - withHeaderAsync(jsonParser) { request => - val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) - rest.checkEdges(params).body.map { js => - jsonResponse(js, "result_size" -> rest.calcSize(js).toString) - } recoverWith ApplicationController.requestFallback(request.body) + withHeaderAsync(jsonText) { + request => + val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) + rest.checkEdges(params).body.map { + js => + jsonResponse(js, "result_size" -> rest.calcSize(js).toString) + } recoverWith ApplicationController.requestFallback(request.body) } - def getVertices() = withHeaderAsync(jsonParser)(delegate) + def getVertices() = withHeaderAsync(jsonText)(delegate) }
