Repository: incubator-s2graph Updated Branches: refs/heads/master b58e6f99e -> e8ec7408e
[S2GRAPH-36]: Provide Blocking API for Edge/Vertex operations. add withWait on incrementCounts/mutateEdgesBulk move logics for deleteAll into s2core from rest project. bug fix for ignoring isAsync flag on label for deleteAll. change kafka message for deleteAll. JIRA: [S2GRAPH-36] https://issues.apache.org/jira/browse/S2GRAPH-36 Pull Request: Closes #19 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8ec7408 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8ec7408 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8ec7408 Branch: refs/heads/master Commit: e8ec7408eb6b3d924d6a8b207dac4eb169c3bf80 Parents: b58e6f9 Author: DO YUNG YOON <[email protected]> Authored: Tue Feb 23 16:30:50 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Feb 23 16:30:50 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../scala/com/kakao/s2graph/core/Graph.scala | 2 +- .../kakao/s2graph/core/rest/RequestParser.scala | 9 +- .../kakao/s2graph/core/storage/Storage.scala | 17 +- .../core/storage/hbase/AsynchbaseStorage.scala | 156 +++++++++++++------ .../app/controllers/EdgeController.scala | 114 +++++++------- s2rest_play/conf/routes | 2 + 7 files changed, 185 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 419de68..74c2bc8 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.1 - unreleased S2GRAPH-35: Provide normalize option on query (Committed by DOYUNG YOON). + S2GRAPH-36: Provide Blocking API for Edge/Vertex operations (Committed by DOYUNG YOON). + IMPROVEMENT S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala index 58953e2..bb286c3 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala @@ -368,7 +368,7 @@ class Graph(_config: Config)(implicit ec: ExecutionContext) { def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait) - def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges) + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait) def shutdown(): Unit = { storage.flush() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala index 141b2fe..63c41e0 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -348,6 +348,13 @@ class RequestParser(config: Config) extends JSONParser { } + def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = { + val jsValues = toJsValues(jsValue) + val edges = jsValues.map(toEdge(_, operation)) + + (edges, jsValues) + } + def toEdges(jsValue: JsValue, operation: String): List[Edge] = { toJsValues(jsValue).map(toEdge(_, operation)) } @@ -502,7 +509,7 @@ class RequestParser(config: Config) extends JSONParser { def toDeleteParam(json: JsValue) = { val labelName = (json \ "label").as[String] - val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) + val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync) val direction = (json \ "direction").asOpt[String].getOrElse("out") val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index bff0f3b..c9e768e 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -47,13 +47,16 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { Future.sequence(futures) } + def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] - def mutateEdges(edges: Seq[Edge], - withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = edges.map { edge => mutateEdge(edge, withWait) } - Future.sequence(futures) - } + def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] + +// def mutateEdges(edges: Seq[Edge], +// withWait: Boolean = false): Future[Seq[Boolean]] = { +// val futures = edges.map { edge => mutateEdge(edge, withWait) } +// Future.sequence(futures) +// } def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] @@ -63,9 +66,9 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { Future.sequence(futures) } - def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] + def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] - def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] def flush(): Unit http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 3ccb473..019b2bc 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -3,6 +3,7 @@ package com.kakao.s2graph.core.storage.hbase import java.util import com.google.common.cache.Cache +import com.kakao.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val} import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException import com.kakao.s2graph.core._ import com.kakao.s2graph.core.mysqls.{Label, LabelMeta} @@ -17,6 +18,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} +import org.apache.kafka.clients.producer.ProducerRecord import org.hbase.async._ import scala.collection.JavaConversions._ @@ -134,50 +136,67 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] = { - // mutateEdgeWithOp(edge, withWait) - val strongConsistency = edge.label.consistencyLevel == "strong" val edgeFuture = - if (edge.op == GraphUtil.operations("delete") && !strongConsistency) { - val zkQuorum = edge.label.hbaseZkAddr - val (_, edgeUpdate) = Edge.buildDeleteBulk(None, edge) - val mutations = - mutationBuilder.indexedEdgeMutations(edgeUpdate) ++ - mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++ - mutationBuilder.increments(edgeUpdate) - writeAsyncSimple(zkQuorum, mutations, withWait) + if (edge.op == GraphUtil.operations("deleteAll")) { + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) } else { - mutateEdgesInner(Seq(edge), strongConsistency, withWait)(Edge.buildOperation) + val strongConsistency = edge.label.consistencyLevel == "strong" + if (edge.op == GraphUtil.operations("delete") && !strongConsistency) { + val zkQuorum = edge.label.hbaseZkAddr + val (_, edgeUpdate) = Edge.buildDeleteBulk(None, edge) + val mutations = + mutationBuilder.indexedEdgeMutations(edgeUpdate) ++ + mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++ + mutationBuilder.increments(edgeUpdate) + writeAsyncSimple(zkQuorum, mutations, withWait) + } else { + mutateEdgesInner(Seq(edge), strongConsistency, withWait)(Edge.buildOperation) + } } + val vertexFuture = writeAsyncSimple(edge.label.hbaseZkAddr, mutationBuilder.buildVertexPutsAsync(edge), withWait) - Future.sequence(Seq(edgeFuture, vertexFuture)).map { rets => rets.forall(identity) } + + Future.sequence(Seq(edgeFuture, vertexFuture)).map(_.forall(identity)) } - override def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { - val edgeGrouped = edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq + override def mutateEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { + val grouped = _edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq - val ret = edgeGrouped.map { case ((label, srcId, tgtId), edges) => - if (edges.isEmpty) Future.successful(true) - else { - val head = edges.head - val strongConsistency = head.label.consistencyLevel == "strong" - - if (strongConsistency) { - val edgeFuture = mutateEdgesInner(edges, strongConsistency, withWait)(Edge.buildOperation) - //TODO: decide what we will do on failure on vertex put - val vertexFuture = writeAsyncSimple(head.label.hbaseZkAddr, - mutationBuilder.buildVertexPutsAsync(head), withWait) - Future.sequence(Seq(edgeFuture, vertexFuture)).map { rets => rets.forall(identity) } - } else { - Future.sequence(edges.map { edge => - mutateEdge(edge, withWait = withWait) - }).map { rets => - rets.forall(identity) + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val (deleteAllEdges, edges) = edgeGroup.partition(_.op == GraphUtil.operations("deleteAll")) + + // DeleteAll first + val deleteAllFutures = deleteAllEdges.map { edge => + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) + } + + // After deleteAll, process others + lazy val mutateEdgeFutures = edges.toList match { + case head :: tail => + val strongConsistency = edges.head.label.consistencyLevel == "strong" + if (strongConsistency) { + val edgeFuture = mutateEdgesInner(edges, strongConsistency, withWait)(Edge.buildOperation) + + //TODO: decide what we will do on failure on vertex put + val puts = mutationBuilder.buildVertexPutsAsync(head) + val vertexFuture = writeAsyncSimple(head.label.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + } else { + edges.map { edge => mutateEdge(edge, withWait = withWait) } } - } + case Nil => Nil } + + val composed = for { + deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield deleteRet ++ mutateRet + + composed.map(_.forall(identity)) } - Future.sequence(ret) + + Future.sequence(mutateEdges) } def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = { @@ -191,7 +210,8 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, } } - def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = { + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = { + val _client = if (withWait) clientWithFlush else client val defers: Seq[Deferred[(Boolean, Long)]] = for { edge <- edges } yield { @@ -200,12 +220,14 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, val countVal = countWithTs.innerVal.toString().toLong val incr = mutationBuilder.buildIncrementsCountAsync(edgeWithIndex, countVal).head val request = incr.asInstanceOf[AtomicIncrementRequest] - client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long => + val defer = _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long => (true, resultCount.longValue()) } recoverWith { ex => logger.error(s"mutation failed. $request", ex) (false, -1L) } + if (withWait) defer + else Deferred.fromResult((true, -1L)) } val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers) @@ -306,7 +328,7 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, logger.debug(msg) } - private def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge) = { + private def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: Option[KeyValue]) = { val currentTs = System.currentTimeMillis() val lockTs = snapshotEdgeOpt match { case None => Option(currentTs) @@ -316,7 +338,8 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, case Some(pendingEdge) => pendingEdge.lockTs } } - val newVersion = snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1 + val newVersion = kvOpt.map(_.timestamp()).getOrElse(edge.ts) + 1 + // snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1 val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = lockTs) val base = snapshotEdgeOpt match { case None => @@ -329,7 +352,8 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, base.copy(version = newVersion, statusCode = 1, lockTs = None) } - private def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { + private def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: SnapshotEdge, + edgeMutate: EdgeMutate) = { val newVersion = lockEdge.version + 1 val base = edgeMutate.newSnapshotEdge match { case None => @@ -474,9 +498,9 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, edgeUpdate: EdgeMutate): Future[Boolean] = { val label = edge.label def oldBytes = kvOpt.map(_.value()).getOrElse(Array.empty) -// def oldBytes = snapshotEdgeOpt.map { e => -// snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head.value -// }.getOrElse(Array.empty) + // def oldBytes = snapshotEdgeOpt.map { e => + // snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head.value + // }.getOrElse(Array.empty) def process(lockEdge: SnapshotEdge, releaseLockEdge: SnapshotEdge, _edgeMutate: EdgeMutate, @@ -493,7 +517,7 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, } - val lockEdge = buildLockEdge(snapshotEdgeOpt, edge) + val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt) val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, edgeUpdate) snapshotEdgeOpt match { case None => @@ -510,8 +534,13 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, if (isLockExpired) { val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(pendingEdge)) - val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge) - val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, newLockEdge, newEdgeUpdate) + val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, kvOpt) + val _newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, newLockEdge, newEdgeUpdate) + + // set lock ts as current ts + val newPendingEdgeOpt = _newReleaseLockEdge.pendingEdgeOpt.map(_.copy(lockTs = Option(System.currentTimeMillis()))) + val newReleaseLockEdge = _newReleaseLockEdge.copy(pendingEdgeOpt = newPendingEdgeOpt) + process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode = 0).flatMap { ret => val log = s"[Success]: Resolving expired pending edge.\n${pendingEdge.toLogString}" throw new PartialFailureException(edge, 0, log) @@ -621,10 +650,10 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, "----------------------------------------------").mkString("\n") } - private def deleteAllFetchedEdgesAsyncOld(queryRequestWithResult: QueryRequestWithResult, + private def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest, + queryResult: QueryResult, requestTs: Long, retryNum: Int): Future[Boolean] = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get val queryParam = queryRequest.queryParam val zkQuorum = queryParam.label.hbaseZkAddr val futures = for { @@ -669,7 +698,7 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, val futures = for { queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs) if deleteQueryResult.edgeWithScoreLs.nonEmpty } yield { @@ -681,14 +710,14 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, * read: snapshotEdge on queryResult = O(N) * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) */ - mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map { rets => rets.forall(identity) } + mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) case _ => /** * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - deleteAllFetchedEdgesAsyncOld(queryRequestWithResult, requestTs, MaxRetryNum) + deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) } } if (futures.isEmpty) { @@ -715,10 +744,26 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, } - def deleteAllAdjacentEdges(srcVertices: List[Vertex], + def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = { + + def enqueueLogMessage() = { + val kafkaMessages = for { + vertice <- srcVertices + id = vertice.innerId.toIdString() + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") + val topic = ExceptionHandler.failTopic + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + + ExceptionHandler.enqueues(kafkaMessages) + } + val requestTs = ts val queryParams = for { label <- labels @@ -731,11 +776,19 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, val q = Query(srcVertices, Vector(step)) // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { - Extensions.retryOnSuccess(MaxRetryNum) { + val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) { fetchAndDeleteAll(q, requestTs) } { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } + + retryFuture onFailure { + case ex => + logger.error(s"[Error]: deleteAllAdjacentEdges failed.") + enqueueLogMessage() + } + + retryFuture } def flush(): Unit = clients.foreach { client => @@ -790,6 +843,7 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, val conn = ConnectionFactory.createConnection(conf) conn.getAdmin } + private def enableTable(zkAddr: String, tableName: String) = { getAdmin(zkAddr).enableTable(TableName.valueOf(tableName)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2rest_play/app/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/EdgeController.scala b/s2rest_play/app/controllers/EdgeController.scala index 32af6c3..2b3b32e 100644 --- a/s2rest_play/app/controllers/EdgeController.scala +++ b/s2rest_play/app/controllers/EdgeController.scala @@ -24,6 +24,23 @@ object EdgeController extends Controller { private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser + private def jsToStr(js: JsValue): String = js match { + case JsString(s) => s + case _ => js.toString() + } + + def toTsv(jsValue: JsValue, op: String): String = { + val ts = jsToStr(jsValue \ "timestamp") + val from = jsToStr(jsValue \ "from") + val to = jsToStr(jsValue \ "to") + val label = jsToStr(jsValue \ "label") + val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) + + (jsValue \ "direction").asOpt[String] match { + case None => Seq(ts, op, "e", from, to, label, props).mkString("\t") + case Some(dir) => Seq(ts, op, "e", from, to, label, props, dir).mkString("\t") + } + } def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) @@ -31,12 +48,13 @@ object EdgeController extends Controller { else { try { logger.debug(s"$jsValue") - val edges = requestParser.toEdges(jsValue, operation) - for (edge <- edges) { + val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation) + + for ((edge, orgJs) <- edges.zip(jsOrgs)) { if (edge.isAsync) - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, None)) + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, Option(toTsv(orgJs, operation)))) else - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, None)) + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, Option(toTsv(orgJs, operation)))) } val edgesToStore = edges.filterNot(e => e.isAsync) @@ -89,8 +107,6 @@ object EdgeController extends Controller { val rets = elementsToStore.map { element => QueueActor.router ! element; true } Future.successful(jsonResponse(Json.toJson(rets))) } - - } catch { case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) case e: Throwable => @@ -100,7 +116,11 @@ object EdgeController extends Controller { } def mutateBulk() = withHeaderAsync(parse.text) { request => - mutateAndPublish(request.body) + mutateAndPublish(request.body, withWait = false) + } + + def mutateBulkWithWait() = withHeaderAsync(parse.text) { request => + mutateAndPublish(request.body, withWait = true) } def inserts() = withHeaderAsync(jsonParser) { request => @@ -135,10 +155,15 @@ object EdgeController extends Controller { tryMutates(request.body, "increment") } + def incrementsWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "increment", withWait = true) + } + def incrementCounts() = withHeaderAsync(jsonParser) { request => val jsValue = request.body val edges = requestParser.toEdges(jsValue, "incrementCount") - s2.incrementCounts(edges).map { results => + + s2.incrementCounts(edges, withWait = true).map { results => val json = results.map { case (isSuccess, resultCount) => Json.obj("success" -> isSuccess, "result" -> resultCount) } @@ -148,72 +173,47 @@ object EdgeController extends Controller { } def deleteAll() = withHeaderAsync(jsonParser) { request => - deleteAllInner(request.body, withWait = false) +// deleteAllInner(request.body, withWait = false) + deleteAllInner(request.body, withWait = true) } def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { - val deleteResults = Future.sequence(jsValue.as[Seq[JsValue]] map { json => - - val labelName = (json \ "label").as[String] - val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) - val direction = (json \ "direction").asOpt[String].getOrElse("out") - - val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) - val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) - val vertices = requestParser.toVertices(labelName, direction, ids) - - /** logging for delete all request */ + /** logging for delete all request */ + def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = { val kafkaMessages = for { id <- ids label <- labels } yield { - val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t") - val topic = if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg + val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), label.label, "{}", direction).mkString("\t") + val topic = topicOpt.getOrElse { + if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC } + + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + ExceptionHandler.enqueues(kafkaMessages) + } + def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = { + enqueueLogMessage(ids, labels, ts, direction, None) val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) - future.onFailure { case ex: Exception => - logger.error(s"[Error]: deleteAllInner failed.", ex) - val kafkaMessages = for { - id <- ids - label <- labels - } yield { - val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t") - val topic = failTopic - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg - } - ExceptionHandler.enqueues(kafkaMessages) - throw ex - } if (withWait) { - future.map { ret => - if (!ret) { - logger.error(s"[Error]: deleteAllInner failed.") - val kafkaMessages = for { - id <- ids - label <- labels - } yield { - val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t") - val topic = failTopic - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg - } - ExceptionHandler.enqueues(kafkaMessages) - false - } else { - true - } - } + future } else { Future.successful(true) } - }) + } + + val deleteFutures = jsValue.as[Seq[JsValue]].map { json => + val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json) + if (labels.isEmpty || ids.isEmpty) Future.successful(true) + else deleteEach(labels, direction, ids, ts, vertices) + } + val deleteResults = Future.sequence(deleteFutures) deleteResults.map { rst => logger.debug(s"deleteAllInner: $rst") Ok(s"deleted... ${rst.toString()}") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2rest_play/conf/routes ---------------------------------------------------------------------- diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes index df4a1ee..90838c8 100644 --- a/s2rest_play/conf/routes +++ b/s2rest_play/conf/routes @@ -23,8 +23,10 @@ POST /graphs/edges/deleteAll contro POST /graphs/edges/update controllers.EdgeController.updates() POST /graphs/edges/updateWithWait controllers.EdgeController.updatesWithWait() POST /graphs/edges/increment controllers.EdgeController.increments() +POST /graphs/edges/incrementWithWait controllers.EdgeController.incrementsWithWait() POST /graphs/edges/incrementCount controllers.EdgeController.incrementCounts() POST /graphs/edges/bulk controllers.EdgeController.mutateBulk() +POST /graphs/edges/bulkWithWait controllers.EdgeController.mutateBulkWithWait() ## Vertex POST /graphs/vertices/insert controllers.VertexController.inserts()
