More advanced options on Label for publishing to Kafka.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3f313097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3f313097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3f313097

Branch: refs/heads/master
Commit: 3f3130978e013cf88d7cf30774401d7f2615316f
Parents: c90fb0d
Author: DO YUNG YOON <[email protected]>
Authored: Tue May 24 10:28:34 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Nov 10 21:42:54 2016 +0900

----------------------------------------------------------------------
 .../s2graph/core/rest/RequestParser.scala       |  48 +++---
 .../core/Integrate/WeakLabelDeleteTest.scala    |   7 +-
 .../controllers/ApplicationController.scala     |   2 +
 .../rest/play/controllers/EdgeController.scala  | 164 +++++++++----------
 .../play/controllers/PublishController.scala    |   2 +-
 5 files changed, 108 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index e53ae5a..52ee50d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -456,43 +456,51 @@ class RequestParser(config: Config) {
       case arr: JsArray => arr.as[List[JsValue]]
       case _ => List.empty[JsValue]
     }
+  }
 
+  def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
   }
 
-  def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], 
List[JsValue]) = {
-    val jsValues = toJsValues(jsValue)
-    val edges = jsValues.flatMap(toEdge(_, operation))
+  def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
+    val edgeStrs = str.split("\\n")
+    val elementsWithTsv = for {
+      edgeStr <- edgeStrs
+      str <- GraphUtil.parseString(edgeStr)
+      element <- Graph.toGraphElement(str)
+    } yield (element, str)
 
-    (edges, jsValues)
+    elementsWithTsv
   }
 
-  def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
-    toJsValues(jsValue).flatMap { edgeJson =>
-      toEdge(edgeJson, operation)
-    }
+  def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(Edge, 
String)] = {
+    val jsValues = toJsValues(jsValue)
+    jsValues.flatMap(toEdgeWithTsv(_, operation))
   }
 
-
-  private def toEdge(jsValue: JsValue, operation: String): List[Edge] = {
-
-    def parseId(js: JsValue) = js match {
-      case s: JsString => s.as[String]
-      case o@_ => s"${o}"
-    }
-    val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_))
-    val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_))
-    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ srcId
-    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ tgtId
+  private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, 
String)] = {
+    val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr)
+    val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr)
+    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(jsToStr)) ++ srcId
+    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => 
tos.map(jsToStr)) ++ tgtId
 
     val label = parse[String](jsValue, "label")
     val timestamp = parse[Long](jsValue, "timestamp")
     val direction = parseOption[String](jsValue, "direction").getOrElse("")
     val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+
     for {
       srcId <- srcIds
       tgtId <- tgtIds
     } yield {
-      Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, 
props.toString)
+      val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, 
direction, props.toString)
+      val tsv = (jsValue \ "direction").asOpt[String] match {
+        case None => Seq(timestamp, operation, "e", srcId, tgtId, label, 
props).mkString("\t")
+        case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, 
props, dir).mkString("\t")
+      }
+
+      (edge, tsv)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
index c0ab323..3f76d59 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with 
BeforeAndAfterEach {
     /** expect 4 edges */
     (result \ "results").as[List[JsValue]].size should be(4)
     val edges = (result \ "results").as[List[JsObject]]
-    val edgesToStore = parser.toEdges(Json.toJson(edges), "delete")
+    val edgesToStore = parser.parseJsonFormat(Json.toJson(edges), 
"delete").map(_._1)
     val rets = graph.mutateEdges(edgesToStore, withWait = true)
     Await.result(rets, Duration(20, TimeUnit.MINUTES))
 
@@ -152,4 +152,3 @@ class WeakLabelDeleteTest extends IntegrateCommon with 
BeforeAndAfterEach {
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index b328c85..13639b9 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -67,6 +67,8 @@ object ApplicationController extends Controller {
     else NotFound
   }
 
+  def skipElement(isAsync: Boolean) = !isWriteFallbackHealthy || isAsync
+
   def toKafkaTopic(isAsync: Boolean) = {
     if (!isWriteFallbackHealthy) Config.KAFKA_FAIL_TOPIC
     else {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 5b2ef97..b78b778 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -30,159 +30,145 @@ import play.api.mvc.{Controller, Result}
 
 import scala.collection.Seq
 import scala.concurrent.Future
+import scala.util.Random
 
 object EdgeController extends Controller {
 
   import ApplicationController._
-  import ExceptionHandler._
   import play.api.libs.concurrent.Execution.Implicits._
 
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
   private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
 
-  private def jsToStr(js: JsValue): String = js match {
-    case JsString(s) => s
-    case obj => obj.toString()
-  }
-  private def jsToStr(js: JsLookupResult): String = 
js.toOption.map(jsToStr).getOrElse("undefined")
-
-  def toTsv(jsValue: JsValue, op: String): String = {
-    val ts = jsToStr(jsValue \ "timestamp")
-    val from = jsToStr(jsValue \ "from")
-    val to = jsToStr(jsValue \ "to")
-    val label = jsToStr(jsValue \ "label")
-    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-
-    (jsValue \ "direction").asOpt[String] match {
-      case None => Seq(ts, op, "e", from, to, label, props).mkString("\t")
-      case Some(dir) => Seq(ts, op, "e", from, to, label, props, 
dir).mkString("\t")
+  private def enqueue(topic: String, elem: GraphElement, tsv: String) = {
+    val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, 
Option(tsv))
+    walLogHandler.enqueue(kafkaMessage)
+  }
+
+  private def publish(graphElem: GraphElement, tsv: String) = {
+    val kafkaTopic = toKafkaTopic(graphElem.isAsync)
+
+    graphElem match {
+      case v: Vertex => enqueue(kafkaTopic, graphElem, tsv)
+      case e: Edge =>
+        e.label.extraOptions.get("walLog") match {
+          case None => enqueue(kafkaTopic, e, tsv)
+          case Some(walLogOpt) =>
+            (walLogOpt \ "method").as[JsValue] match {
+              case JsString("drop") => // pass
+              case JsString("sample") =>
+                val rate = (walLogOpt \ "rate").as[Int]
+                if (Random.nextInt(100) < rate) enqueue(kafkaTopic, e, tsv)
+              case _ => enqueue(kafkaTopic, e, tsv)
+            }
+        }
     }
   }
 
-  def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = 
false): Future[Result] = {
+  private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], 
withWait: Boolean): Future[Result] = {
     if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-
     else {
       try {
-        logger.debug(s"$jsValue")
-        val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
-
-        for ((edge, orgJs) <- edges.zip(jsOrgs)) {
-          val kafkaTopic = toKafkaTopic(edge.isAsync)
-          val kafkaMessage = ExceptionHandler.toKafkaMessage(kafkaTopic, edge, 
Option(toTsv(orgJs, operation)))
-          walLogHandler.enqueue(kafkaMessage)
+        elementsWithTsv.foreach { case (graphElem, tsv) =>
+          publish(graphElem, tsv)
         }
 
-        val edgesToStore = edges.filterNot(e => e.isAsync)
-
-        if (withWait) {
-          val rets = s2.mutateEdges(edgesToStore, withWait = true)
-          rets.map(Json.toJson(_)).map(jsonResponse(_))
-        } else {
-          val rets = edgesToStore.map { edge => QueueActor.router ! edge; true 
}
-          Future.successful(jsonResponse(Json.toJson(rets)))
+        val elementsToStore = for {
+          (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync)
+        } yield e
+
+        if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray()))
+        else {
+          if (withWait) {
+            val rets = s2.mutateElements(elementsToStore, withWait)
+            rets.map(Json.toJson(_)).map(jsonResponse(_))
+          } else {
+            val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
+            Future.successful(jsonResponse(Json.toJson(rets)))
+          }
         }
       } catch {
         case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
-        case e: Exception =>
-          logger.error(s"mutateAndPublish: $e", e)
+        case e: Throwable =>
+          logger.error(s"tryMutate: ${e.getMessage}", e)
           Future.successful(InternalServerError(s"${e.getStackTrace}"))
       }
     }
   }
 
-  def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] 
= {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+  def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean 
= false): Future[Result] = {
+    logger.debug(s"$jsValue")
+    val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation)
+    tryMutate(edgesWithTsv, withWait)
+  }
 
+  def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] 
= {
     logger.debug(s"$str")
-    val edgeStrs = str.split("\\n")
-
-    var vertexCnt = 0L
-    var edgeCnt = 0L
-    try {
-      val elements =
-        for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); 
element <- Graph.toGraphElement(str)) yield {
-          element match {
-            case v: Vertex => vertexCnt += 1
-            case e: Edge => edgeCnt += 1
-          }
-          val kafkaTopic = toKafkaTopic(element.isAsync)
-          walLogHandler.enqueue(toKafkaMessage(kafkaTopic, element, Some(str)))
-          element
-        }
-
-      //FIXME:
-      val elementsToStore = elements.filterNot(e => e.isAsync)
-      if (withWait) {
-        val rets = s2.mutateElements(elementsToStore, withWait)
-        rets.map(Json.toJson(_)).map(jsonResponse(_))
-      } else {
-        val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
-        Future.successful(jsonResponse(Json.toJson(rets)))
-      }
-    } catch {
-      case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
-      case e: Throwable =>
-        logger.error(s"mutateAndPublish: $e", e)
-        Future.successful(InternalServerError(s"${e.getStackTrace}"))
-    }
+    val elementsWithTsv = requestParser.parseBulkFormat(str)
+    tryMutate(elementsWithTsv, withWait)
   }
 
   def mutateBulk() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body, withWait = false)
+    mutateBulkFormat(request.body, withWait = false)
   }
 
   def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body, withWait = true)
+    mutateBulkFormat(request.body, withWait = true)
   }
 
   def inserts() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert")
+    mutateJsonFormat(request.body, "insert")
   }
 
   def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert", withWait = true)
+    mutateJsonFormat(request.body, "insert", withWait = true)
   }
 
   def insertsBulk() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insertBulk")
+    mutateJsonFormat(request.body, "insertBulk")
   }
 
   def deletes() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete")
+    mutateJsonFormat(request.body, "delete")
   }
 
   def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete", withWait = true)
+    mutateJsonFormat(request.body, "delete", withWait = true)
   }
 
   def updates() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update")
+    mutateJsonFormat(request.body, "update")
   }
 
   def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update", withWait = true)
+    mutateJsonFormat(request.body, "update", withWait = true)
   }
 
   def increments() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment")
+    mutateJsonFormat(request.body, "increment")
   }
 
   def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment", withWait = true)
+    mutateJsonFormat(request.body, "increment", withWait = true)
   }
 
   def incrementCounts() = withHeaderAsync(jsonParser) { request =>
     val jsValue = request.body
-    val edges = requestParser.toEdges(jsValue, "incrementCount")
+    val edgesWithTsv = requestParser.parseJsonFormat(jsValue, "incrementCount")
 
-    s2.incrementCounts(edges, withWait = true).map { results =>
-      val json = results.map { case (isSuccess, resultCount) =>
-        Json.obj("success" -> isSuccess, "result" -> resultCount)
-      }
+    val edges = for {
+      (e, _tsv) <- edgesWithTsv if !skipElement(e.isAsync)
+    } yield e
 
-      jsonResponse(Json.toJson(json))
+    if (edges.isEmpty) Future.successful(jsonResponse(JsArray()))
+    else {
+      s2.incrementCounts(edges, withWait = true).map { results =>
+        val json = results.map { case (isSuccess, resultCount) =>
+          Json.obj("success" -> isSuccess, "result" -> resultCount)
+        }
+        jsonResponse(Json.toJson(json))
+      }
     }
   }
 
@@ -199,10 +185,8 @@ object EdgeController extends Controller {
         id <- ids
         label <- labels
       } yield {
-        val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), 
label.label, "{}", direction).mkString("\t")
-        val topic = topicOpt.getOrElse {
-          if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
-        }
+        val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), 
requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t")
+        val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) }
 
         ExceptionHandler.toKafkaMessage(topic, tsv)
       }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 1a037ae..0260b7a 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -69,6 +69,6 @@ object PublishController extends Controller {
   //    }
   //  }
   def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
-    EdgeController.mutateAndPublish(request.body)
+    EdgeController.mutateBulkFormat(request.body)
   }
 }

Reply via email to