More advanced options on Label for publishing to Kafka.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3f313097 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3f313097 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3f313097 Branch: refs/heads/master Commit: 3f3130978e013cf88d7cf30774401d7f2615316f Parents: c90fb0d Author: DO YUNG YOON <[email protected]> Authored: Tue May 24 10:28:34 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Nov 10 21:42:54 2016 +0900 ---------------------------------------------------------------------- .../s2graph/core/rest/RequestParser.scala | 48 +++--- .../core/Integrate/WeakLabelDeleteTest.scala | 7 +- .../controllers/ApplicationController.scala | 2 + .../rest/play/controllers/EdgeController.scala | 164 +++++++++---------- .../play/controllers/PublishController.scala | 2 +- 5 files changed, 108 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index e53ae5a..52ee50d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -456,43 +456,51 @@ class RequestParser(config: Config) { case arr: JsArray => arr.as[List[JsValue]] case _ => List.empty[JsValue] } + } + def jsToStr(js: JsValue): String = js match { + case JsString(s) => s + case _ => js.toString() } - def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = { - val jsValues = toJsValues(jsValue) - val edges = jsValues.flatMap(toEdge(_, operation)) + def parseBulkFormat(str: String): Seq[(GraphElement, String)] = { + val edgeStrs = str.split("\\n") + val elementsWithTsv = for { + edgeStr <- edgeStrs + str <- GraphUtil.parseString(edgeStr) + element <- Graph.toGraphElement(str) + } yield (element, str) - (edges, jsValues) + elementsWithTsv } - def toEdges(jsValue: JsValue, operation: String): List[Edge] = { - toJsValues(jsValue).flatMap { edgeJson => - toEdge(edgeJson, operation) - } + def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(Edge, String)] = { + val jsValues = toJsValues(jsValue) + jsValues.flatMap(toEdgeWithTsv(_, operation)) } - - private def toEdge(jsValue: JsValue, operation: String): List[Edge] = { - - def parseId(js: JsValue) = js match { - case s: JsString => s.as[String] - case o@_ => s"${o}" - } - val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_)) - val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_)) - val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ srcId - val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ tgtId + private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = { + val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr) + val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr) + val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId + val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId val label = parse[String](jsValue, "label") val timestamp = parse[Long](jsValue, "timestamp") val direction = parseOption[String](jsValue, "direction").getOrElse("") val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") + for { srcId <- srcIds tgtId <- tgtIds } yield { - Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) + val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) + val tsv = (jsValue \ "direction").asOpt[String] match { + case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t") + case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t") + } + + (edge, tsv) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala index c0ab323..3f76d59 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/WeakLabelDeleteTest.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { /** expect 4 edges */ (result \ "results").as[List[JsValue]].size should be(4) val edges = (result \ "results").as[List[JsObject]] - val edgesToStore = parser.toEdges(Json.toJson(edges), "delete") + val edgesToStore = parser.parseJsonFormat(Json.toJson(edges), "delete").map(_._1) val rets = graph.mutateEdges(edgesToStore, withWait = true) Await.result(rets, Duration(20, TimeUnit.MINUTES)) @@ -152,4 +152,3 @@ class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach { } } - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala index b328c85..13639b9 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala @@ -67,6 +67,8 @@ object ApplicationController extends Controller { else NotFound } + def skipElement(isAsync: Boolean) = !isWriteFallbackHealthy || isAsync + def toKafkaTopic(isAsync: Boolean) = { if (!isWriteFallbackHealthy) Config.KAFKA_FAIL_TOPIC else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index 5b2ef97..b78b778 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -30,159 +30,145 @@ import play.api.mvc.{Controller, Result} import scala.collection.Seq import scala.concurrent.Future +import scala.util.Random object EdgeController extends Controller { import ApplicationController._ - import ExceptionHandler._ import play.api.libs.concurrent.Execution.Implicits._ private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler - private def jsToStr(js: JsValue): String = js match { - case JsString(s) => s - case obj => obj.toString() - } - private def jsToStr(js: JsLookupResult): String = js.toOption.map(jsToStr).getOrElse("undefined") - - def toTsv(jsValue: JsValue, op: String): String = { - val ts = jsToStr(jsValue \ "timestamp") - val from = jsToStr(jsValue \ "from") - val to = jsToStr(jsValue \ "to") - val label = jsToStr(jsValue \ "label") - val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) - - (jsValue \ "direction").asOpt[String] match { - case None => Seq(ts, op, "e", from, to, label, props).mkString("\t") - case Some(dir) => Seq(ts, op, "e", from, to, label, props, dir).mkString("\t") + private def enqueue(topic: String, elem: GraphElement, tsv: String) = { + val kafkaMessage = ExceptionHandler.toKafkaMessage(topic, elem, Option(tsv)) + walLogHandler.enqueue(kafkaMessage) + } + + private def publish(graphElem: GraphElement, tsv: String) = { + val kafkaTopic = toKafkaTopic(graphElem.isAsync) + + graphElem match { + case v: Vertex => enqueue(kafkaTopic, graphElem, tsv) + case e: Edge => + e.label.extraOptions.get("walLog") match { + case None => enqueue(kafkaTopic, e, tsv) + case Some(walLogOpt) => + (walLogOpt \ "method").as[JsValue] match { + case JsString("drop") => // pass + case JsString("sample") => + val rate = (walLogOpt \ "rate").as[Int] + if (Random.nextInt(100) < rate) enqueue(kafkaTopic, e, tsv) + case _ => enqueue(kafkaTopic, e, tsv) + } + } } } - def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { + private def tryMutate(elementsWithTsv: Seq[(GraphElement, String)], withWait: Boolean): Future[Result] = { if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) - else { try { - logger.debug(s"$jsValue") - val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation) - - for ((edge, orgJs) <- edges.zip(jsOrgs)) { - val kafkaTopic = toKafkaTopic(edge.isAsync) - val kafkaMessage = ExceptionHandler.toKafkaMessage(kafkaTopic, edge, Option(toTsv(orgJs, operation))) - walLogHandler.enqueue(kafkaMessage) + elementsWithTsv.foreach { case (graphElem, tsv) => + publish(graphElem, tsv) } - val edgesToStore = edges.filterNot(e => e.isAsync) - - if (withWait) { - val rets = s2.mutateEdges(edgesToStore, withWait = true) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = edgesToStore.map { edge => QueueActor.router ! edge; true } - Future.successful(jsonResponse(Json.toJson(rets))) + val elementsToStore = for { + (e, _tsv) <- elementsWithTsv if !skipElement(e.isAsync) + } yield e + + if (elementsToStore.isEmpty) Future.successful(jsonResponse(JsArray())) + else { + if (withWait) { + val rets = s2.mutateElements(elementsToStore, withWait) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = elementsToStore.map { element => QueueActor.router ! element; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } } } catch { case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) - case e: Exception => - logger.error(s"mutateAndPublish: $e", e) + case e: Throwable => + logger.error(s"tryMutate: ${e.getMessage}", e) Future.successful(InternalServerError(s"${e.getStackTrace}")) } } } - def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = { - if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + def mutateJsonFormat(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { + logger.debug(s"$jsValue") + val edgesWithTsv = requestParser.parseJsonFormat(jsValue, operation) + tryMutate(edgesWithTsv, withWait) + } + def mutateBulkFormat(str: String, withWait: Boolean = false): Future[Result] = { logger.debug(s"$str") - val edgeStrs = str.split("\\n") - - var vertexCnt = 0L - var edgeCnt = 0L - try { - val elements = - for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield { - element match { - case v: Vertex => vertexCnt += 1 - case e: Edge => edgeCnt += 1 - } - val kafkaTopic = toKafkaTopic(element.isAsync) - walLogHandler.enqueue(toKafkaMessage(kafkaTopic, element, Some(str))) - element - } - - //FIXME: - val elementsToStore = elements.filterNot(e => e.isAsync) - if (withWait) { - val rets = s2.mutateElements(elementsToStore, withWait) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = elementsToStore.map { element => QueueActor.router ! element; true } - Future.successful(jsonResponse(Json.toJson(rets))) - } - } catch { - case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) - case e: Throwable => - logger.error(s"mutateAndPublish: $e", e) - Future.successful(InternalServerError(s"${e.getStackTrace}")) - } + val elementsWithTsv = requestParser.parseBulkFormat(str) + tryMutate(elementsWithTsv, withWait) } def mutateBulk() = withHeaderAsync(parse.text) { request => - mutateAndPublish(request.body, withWait = false) + mutateBulkFormat(request.body, withWait = false) } def mutateBulkWithWait() = withHeaderAsync(parse.text) { request => - mutateAndPublish(request.body, withWait = true) + mutateBulkFormat(request.body, withWait = true) } def inserts() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert") + mutateJsonFormat(request.body, "insert") } def insertsWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert", withWait = true) + mutateJsonFormat(request.body, "insert", withWait = true) } def insertsBulk() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insertBulk") + mutateJsonFormat(request.body, "insertBulk") } def deletes() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete") + mutateJsonFormat(request.body, "delete") } def deletesWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete", withWait = true) + mutateJsonFormat(request.body, "delete", withWait = true) } def updates() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "update") + mutateJsonFormat(request.body, "update") } def updatesWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "update", withWait = true) + mutateJsonFormat(request.body, "update", withWait = true) } def increments() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "increment") + mutateJsonFormat(request.body, "increment") } def incrementsWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "increment", withWait = true) + mutateJsonFormat(request.body, "increment", withWait = true) } def incrementCounts() = withHeaderAsync(jsonParser) { request => val jsValue = request.body - val edges = requestParser.toEdges(jsValue, "incrementCount") + val edgesWithTsv = requestParser.parseJsonFormat(jsValue, "incrementCount") - s2.incrementCounts(edges, withWait = true).map { results => - val json = results.map { case (isSuccess, resultCount) => - Json.obj("success" -> isSuccess, "result" -> resultCount) - } + val edges = for { + (e, _tsv) <- edgesWithTsv if !skipElement(e.isAsync) + } yield e - jsonResponse(Json.toJson(json)) + if (edges.isEmpty) Future.successful(jsonResponse(JsArray())) + else { + s2.incrementCounts(edges, withWait = true).map { results => + val json = results.map { case (isSuccess, resultCount) => + Json.obj("success" -> isSuccess, "result" -> resultCount) + } + jsonResponse(Json.toJson(json)) + } } } @@ -199,10 +185,8 @@ object EdgeController extends Controller { id <- ids label <- labels } yield { - val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), label.label, "{}", direction).mkString("\t") - val topic = topicOpt.getOrElse { - if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC - } + val tsv = Seq(ts, "deleteAll", "e", requestParser.jsToStr(id), requestParser.jsToStr(id), label.label, "{}", direction).mkString("\t") + val topic = topicOpt.getOrElse { toKafkaTopic(label.isAsync) } ExceptionHandler.toKafkaMessage(topic, tsv) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3f313097/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala index 1a037ae..0260b7a 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala @@ -69,6 +69,6 @@ object PublishController extends Controller { // } // } def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request => - EdgeController.mutateAndPublish(request.body) + EdgeController.mutateBulkFormat(request.body) } }
