Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 0c5df5eff -> e45d69d3c


[S2GRAPH-45]: Provide way to call specific bucket on experiment.

  pass impressionId from header into s2core so user can specify specific becket 
under experiment.

JIRA:
  [S2GRAPH-45] https://issues.apache.org/jira/browse/S2GRAPH-45

Pull Request:
  Closes #29


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e45d69d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e45d69d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e45d69d3

Branch: refs/heads/master
Commit: e45d69d3c0396d7cbc13d4bc9193e035cbda8065
Parents: 0c5df5e
Author: DO YUNG YOON <[email protected]>
Authored: Tue Feb 23 20:08:51 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Feb 23 20:08:51 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../kakao/s2graph/core/mysqls/Experiment.scala  | 14 +++---
 .../kakao/s2graph/core/rest/RestHandler.scala   | 46 ++++++++++----------
 .../app/controllers/ApplicationController.scala |  9 ++--
 .../app/controllers/ExperimentController.scala  |  9 ++--
 .../app/controllers/JsonBodyParser.scala        | 25 ++++++++---
 .../app/controllers/QueryController.scala       | 44 ++++++++++---------
 7 files changed, 88 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 602eb0d..b7c2366 100644
--- a/CHANGES
+++ b/CHANGES
@@ -12,6 +12,8 @@ Release 0.12.1 - unreleased
 
     S2GRAPH-32: Support variable such as now, day, hour on query (Committed by 
DOYUNG YOON).
 
+    S2GRAPH-45: Provide way to call specific bucket on experiment (Committed 
by DOYUNG YOON).
+
   IMPROVEMENT
 
     S2GRAPH-14: Abstract HBase specific methods in Management and Label 
(Committed by DOYUNG YOON).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
index d484914..88c85b7 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
@@ -56,12 +56,16 @@ case class Experiment(id: Option[Int],
   } yield range -> bucket
 
 
-  def findBucket(uuid: String): Option[Bucket] = {
-    val seed = experimentType match {
-      case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
-      case _ => (Random.nextInt(totalModular)) + 1
+  def findBucket(uuid: String, impIdOpt: Option[String] = None): 
Option[Bucket] = {
+    impIdOpt match {
+      case Some(impId) => Bucket.findByImpressionId(impId)
+      case None =>
+        val seed = experimentType match {
+          case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
+          case _ => Random.nextInt(totalModular) + 1
+        }
+        findBucket(seed)
     }
-    findBucket(seed)
   }
 
   def findBucket(uuidMod: Int): Option[Bucket] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
index 9647314..a9424d0 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
@@ -23,14 +23,15 @@ object RestHandler {
 class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
 
   import RestHandler._
-
-  val s2Parser = new RequestParser(graph.config)
+  val requestParser = new RequestParser(graph.config)
 
   /**
     * Public APIS
     */
-  def doPost(uri: String, jsQuery: JsValue): HandlerResult = {
+  def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): 
HandlerResult = {
     try {
+      val jsQuery = Json.parse(body)
+
       uri match {
         case "/graphs/getEdges" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
         case "/graphs/getEdges/grouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
@@ -43,7 +44,7 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
         case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
         case uri if uri.startsWith("/graphs/experiment") =>
           val Array(accessToken, experimentName, uuid) = 
uri.split("/").takeRight(3)
-          experiment(jsQuery, accessToken, experimentName, uuid)
+          experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
         case _ => throw new RuntimeException("route is not found")
       }
     } catch {
@@ -54,7 +55,7 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
   // TODO: Refactor to doGet
   def checkEdges(jsValue: JsValue): HandlerResult = {
     try {
-      val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue)
+      val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
 
       HandlerResult(graph.checkEdges(quads).map { case 
queryRequestWithResultLs =>
         val edgeJsons = for {
@@ -77,12 +78,13 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
   /**
     * Private APIS
     */
-  private def experiment(contentsBody: JsValue, accessToken: String, 
experimentName: String, uuid: String): HandlerResult = {
+  private def experiment(contentsBody: JsValue, accessToken: String, 
experimentName: String, uuid: String, impKeyOpt: => Option[String]): 
HandlerResult = {
+
     try {
       val bucketOpt = for {
         service <- Service.findByAccessToken(accessToken)
         experiment <- Experiment.findBy(service.id.get, experimentName)
-        bucket <- experiment.findBucket(uuid)
+        bucket <- experiment.findBucket(uuid, impKeyOpt)
       } yield bucket
 
       val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is 
not found"))
@@ -99,16 +101,15 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
   private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: 
String): HandlerResult = {
     if (bucket.isEmpty) 
HandlerResult(Future.successful(PostProcess.emptyResults))
     else {
-      val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid)
+      val body = buildRequestBody(Option(contentsBody), bucket, uuid)
       val url = new URL(bucket.apiPath)
-      val path = url.getPath()
+      val path = url.getPath
 
       // dummy log for sampling
-      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
-
-      logger.info(experimentLog)
+      val experimentLog = s"POST $path took -1 ms 200 -1 $body"
+      logger.debug(experimentLog)
 
-      doPost(path, jsonBody)
+      doPost(path, body)
     }
   }
 
@@ -132,15 +133,15 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
 
     val fetch = eachQuery(post) _
     jsonQuery match {
-      case JsArray(arr) => 
Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray)
-      case obj@JsObject(_) => fetch(s2Parser.toQuery(obj))
+      case JsArray(arr) => 
Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
+      case obj@JsObject(_) => fetch(requestParser.toQuery(obj))
       case _ => throw BadQueryException("Cannot support")
     }
   }
 
   private def getEdgesExcludedAsync(jsonQuery: JsValue)
                                    (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-    val q = s2Parser.toQuery(jsonQuery)
+    val q = requestParser.toQuery(jsonQuery)
     val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
 
     val fetchFuture = graph.getEdges(q)
@@ -170,9 +171,13 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
     graph.getVertices(vertices) map { vertices => 
PostProcess.verticesToJson(vertices) }
   }
 
-
-  private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: 
Bucket, uuid: String): JsValue = {
+  private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: 
Bucket, uuid: String): String = {
     var body = bucket.requestBody.replace("#uuid", uuid)
+
+    //    // replace variable
+    //    body = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
body)
+
+    // replace param
     for {
       requestKeyJson <- requestKeyJsonOpt
       jsObj <- requestKeyJson.asOpt[JsObject]
@@ -185,10 +190,7 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
       body = body.replace(key, replacement)
     }
 
-    Try(Json.parse(body)).recover {
-      case e: Exception =>
-        throw new BadQueryException(s"wrong or missing template parameter: 
${e.getMessage.takeWhile(_ != '\n')}")
-    } get
+    body
   }
 
   def calcSize(js: JsValue): Int = js match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ApplicationController.scala 
b/s2rest_play/app/controllers/ApplicationController.scala
index d9384d9..5f54edd 100644
--- a/s2rest_play/app/controllers/ApplicationController.scala
+++ b/s2rest_play/app/controllers/ApplicationController.scala
@@ -4,7 +4,7 @@ import com.kakao.s2graph.core.GraphExceptions.BadQueryException
 import com.kakao.s2graph.core.PostProcess
 import com.kakao.s2graph.core.utils.logger
 import play.api.libs.iteratee.Enumerator
-import play.api.libs.json.{JsString, JsValue}
+import play.api.libs.json.{JsString, JsValue, Json}
 import play.api.mvc._
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -17,13 +17,15 @@ object ApplicationController extends Controller {
 
   val jsonParser: BodyParser[JsValue] = controllers.s2parse.json
 
+  val jsonText: BodyParser[String] = controllers.s2parse.jsonText
+
   private def badQueryExceptionResults(ex: Exception) =
     
Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
 
   private def errorResults =
     Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader))
 
-  def requestFallback(body: JsValue): PartialFunction[Throwable, 
Future[Result]] = {
+  def requestFallback(body: String): PartialFunction[Throwable, 
Future[Result]] = {
     case e: BadQueryException =>
       logger.error(s"{$body}, ${e.getMessage}", e)
       badQueryExceptionResults(e)
@@ -64,6 +66,7 @@ object ApplicationController extends Controller {
           case JsString(str) => str
           case _ => jsValue.toString
         }
+        case AnyContentAsEmpty => ""
         case _ => request.body.toString
       }
 
@@ -73,8 +76,6 @@ object ApplicationController extends Controller {
         else
           s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
 
-      logger.info(s"${request.method} ${request.uri} result_size: $resultSize")
-
       str
     } finally {
       /* pass */

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ExperimentController.scala 
b/s2rest_play/app/controllers/ExperimentController.scala
index 40f67d1..e48b0f1 100644
--- a/s2rest_play/app/controllers/ExperimentController.scala
+++ b/s2rest_play/app/controllers/ExperimentController.scala
@@ -1,9 +1,10 @@
 package controllers
 
 
+import com.kakao.s2graph.core.mysqls.Experiment
 import com.kakao.s2graph.core.rest.RestHandler
+import com.kakao.s2graph.core.utils.logger
 import play.api.mvc._
-
 import scala.concurrent.ExecutionContext.Implicits.global
 
 object ExperimentController extends Controller {
@@ -11,10 +12,10 @@ object ExperimentController extends Controller {
 
   import ApplicationController._
 
-  def experiment(accessToken: String, experimentName: String, uuid: String) = 
withHeaderAsync(parse.anyContent) { request =>
-    val body = request.body.asJson.get
-    val res = rest.doPost(request.uri, body)
+  def experiment(accessToken: String, experimentName: String, uuid: String) = 
withHeaderAsync(jsonText) { request =>
+    val body = request.body
 
+    val res = rest.doPost(request.uri, body, 
request.headers.get(Experiment.impressionKey))
     res.body.map { case js =>
       val headers = res.headers :+ ("result_size" -> 
rest.calcSize(js).toString)
       jsonResponse(js, headers: _*)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/JsonBodyParser.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/JsonBodyParser.scala 
b/s2rest_play/app/controllers/JsonBodyParser.scala
index 3e3d40c..4339eb4 100644
--- a/s2rest_play/app/controllers/JsonBodyParser.scala
+++ b/s2rest_play/app/controllers/JsonBodyParser.scala
@@ -2,17 +2,13 @@ package controllers
 
 import com.kakao.s2graph.core.utils.logger
 import play.api.Play
-import play.api.libs.iteratee.Iteratee
+import play.api.libs.iteratee.{Execution, Iteratee}
 import play.api.libs.json.{JsValue, Json}
 import play.api.mvc._
 
 import scala.concurrent.Future
 import scala.util.control.NonFatal
 
-/**
- * Created by hsleep([email protected]) on 15. 9. 1..
- */
-
 object s2parse extends BodyParsers {
 
   import parse._
@@ -20,9 +16,26 @@ object s2parse extends BodyParsers {
   val defaultMaxTextLength = 1024 * 512
   val defaultMaxJsonLength = 1024 * 512
 
-//  def json: BodyParser[JsValue] = json(DEFAULT_MAX_TEXT_LENGTH)
   def json: BodyParser[JsValue] = json(defaultMaxTextLength)
 
+  /**
+    * parseText with application/json header for Pre-Process text
+    */
+  def jsonText: BodyParser[String] = when(
+    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
+    jsonText(defaultMaxTextLength),
+    createBadResult("Expecting text/json or application/json body")
+  )
+
+  private def jsonText(maxLength: Int): BodyParser[String] = BodyParser("json, 
maxLength=" + maxLength) { request =>
+    import play.api.libs.iteratee.Execution.Implicits.trampoline
+    import play.api.libs.iteratee.Traversable
+
+    Traversable.takeUpTo[Array[Byte]](maxLength)
+      .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, 
"UTF-8")))
+      .flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
+  }
+
   def json(maxLength: Int): BodyParser[JsValue] = when(
     _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
     tolerantJson(maxLength),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e45d69d3/s2rest_play/app/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/QueryController.scala 
b/s2rest_play/app/controllers/QueryController.scala
index e836259..d133370 100644
--- a/s2rest_play/app/controllers/QueryController.scala
+++ b/s2rest_play/app/controllers/QueryController.scala
@@ -1,10 +1,10 @@
 package controllers
 
-
 import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.Experiment
 import com.kakao.s2graph.core.rest.RestHandler
-import play.api.libs.json.{JsValue, Json}
-import play.api.mvc.{Controller, Request}
+import play.api.libs.json.{Json}
+import play.api.mvc._
 
 import scala.language.postfixOps
 
@@ -15,34 +15,38 @@ object QueryController extends Controller with JSONParser {
 
   private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
 
-  def delegate(request: Request[JsValue]) =
-    rest.doPost(request.uri, request.body).body.map { js =>
-      jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+  def delegate(request: Request[String]) = {
+    rest.doPost(request.uri, request.body, 
request.headers.get(Experiment.impressionKey)).body.map {
+      js =>
+        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
     } recoverWith ApplicationController.requestFallback(request.body)
+  }
 
-  def getEdges() = withHeaderAsync(jsonParser)(delegate)
+  def getEdges() = withHeaderAsync(jsonText)(delegate)
 
-  def getEdgesWithGrouping() = withHeaderAsync(jsonParser)(delegate)
+  def getEdgesWithGrouping() = withHeaderAsync(jsonText)(delegate)
 
-  def getEdgesExcluded() = withHeaderAsync(jsonParser)(delegate)
+  def getEdgesExcluded() = withHeaderAsync(jsonText)(delegate)
 
-  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser)(delegate)
+  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonText)(delegate)
 
-  def checkEdges() = withHeaderAsync(jsonParser)(delegate)
+  def checkEdges() = withHeaderAsync(jsonText)(delegate)
 
-  def getEdgesGrouped() = withHeaderAsync(jsonParser)(delegate)
+  def getEdgesGrouped() = withHeaderAsync(jsonText)(delegate)
 
-  def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser)(delegate)
+  def getEdgesGroupedExcluded() = withHeaderAsync(jsonText)(delegate)
 
-  def getEdgesGroupedExcludedFormatted() = 
withHeaderAsync(jsonParser)(delegate)
+  def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate)
 
   def getEdge(srcId: String, tgtId: String, labelName: String, direction: 
String) =
-    withHeaderAsync(jsonParser) { request =>
-      val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
-      rest.checkEdges(params).body.map { js =>
-        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
-      } recoverWith ApplicationController.requestFallback(request.body)
+    withHeaderAsync(jsonText) {
+      request =>
+        val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
+        rest.checkEdges(params).body.map {
+          js =>
+            jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+        } recoverWith ApplicationController.requestFallback(request.body)
     }
 
-  def getVertices() = withHeaderAsync(jsonParser)(delegate)
+  def getVertices() = withHeaderAsync(jsonText)(delegate)
 }

Reply via email to