Repository: incubator-s2graph
Updated Branches:
  refs/heads/master b58e6f99e -> e8ec7408e


[S2GRAPH-36]: Provide Blocking API for Edge/Vertex operations.

  add withWait on incrementCounts/mutateEdgesBulk
  move logics for deleteAll into s2core from rest project.
  bug fix for ignoring isAsync flag on label for deleteAll.
  change kafka message for deleteAll.

JIRA:
  [S2GRAPH-36] https://issues.apache.org/jira/browse/S2GRAPH-36

Pull Request:
  Closes #19


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8ec7408
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8ec7408
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8ec7408

Branch: refs/heads/master
Commit: e8ec7408eb6b3d924d6a8b207dac4eb169c3bf80
Parents: b58e6f9
Author: DO YUNG YOON <[email protected]>
Authored: Tue Feb 23 16:30:50 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Feb 23 16:30:50 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../scala/com/kakao/s2graph/core/Graph.scala    |   2 +-
 .../kakao/s2graph/core/rest/RequestParser.scala |   9 +-
 .../kakao/s2graph/core/storage/Storage.scala    |  17 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  | 156 +++++++++++++------
 .../app/controllers/EdgeController.scala        | 114 +++++++-------
 s2rest_play/conf/routes                         |   2 +
 7 files changed, 185 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 419de68..74c2bc8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.1 - unreleased
 
     S2GRAPH-35: Provide normalize option on query (Committed by DOYUNG YOON).
 
+    S2GRAPH-36: Provide Blocking API for Edge/Vertex operations (Committed by 
DOYUNG YOON).
+
   IMPROVEMENT
 
     S2GRAPH-14: Abstract HBase specific methods in Management and Label 
(Committed by DOYUNG YOON).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
index 58953e2..bb286c3 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala
@@ -368,7 +368,7 @@ class Graph(_config: Config)(implicit ec: ExecutionContext) 
{
 
   def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): 
Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait)
 
-  def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = 
storage.incrementCounts(edges)
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait)
 
   def shutdown(): Unit = {
     storage.flush()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index 141b2fe..63c41e0 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -348,6 +348,13 @@ class RequestParser(config: Config) extends JSONParser {
 
   }
 
+  def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], 
List[JsValue]) = {
+    val jsValues = toJsValues(jsValue)
+    val edges = jsValues.map(toEdge(_, operation))
+
+    (edges, jsValues)
+  }
+
   def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
     toJsValues(jsValue).map(toEdge(_, operation))
   }
@@ -502,7 +509,7 @@ class RequestParser(config: Config) extends JSONParser {
 
   def toDeleteParam(json: JsValue) = {
     val labelName = (json \ "label").as[String]
-    val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
+    val labels = Label.findByName(labelName).map { l => Seq(l) 
}.getOrElse(Nil).filterNot(_.isAsync)
     val direction = (json \ "direction").asOpt[String].getOrElse("out")
 
     val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
index bff0f3b..c9e768e 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
@@ -47,13 +47,16 @@ abstract class Storage(val config: Config)(implicit ec: 
ExecutionContext) {
     Future.sequence(futures)
   }
 
+
   def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean]
 
-  def mutateEdges(edges: Seq[Edge],
-                  withWait: Boolean = false): Future[Seq[Boolean]] = {
-    val futures = edges.map { edge => mutateEdge(edge, withWait) }
-    Future.sequence(futures)
-  }
+  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): 
Future[Seq[Boolean]]
+
+//  def mutateEdges(edges: Seq[Edge],
+//                  withWait: Boolean = false): Future[Seq[Boolean]] = {
+//    val futures = edges.map { edge => mutateEdge(edge, withWait) }
+//    Future.sequence(futures)
+//  }
 
   def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean]
 
@@ -63,9 +66,9 @@ abstract class Storage(val config: Config)(implicit ec: 
ExecutionContext) {
     Future.sequence(futures)
   }
 
-  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], 
dir: Int, ts: Long): Future[Boolean]
+  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], labels: Seq[Label], 
dir: Int, ts: Long): Future[Boolean]
 
-  def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]]
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]]
 
   def flush(): Unit
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 3ccb473..019b2bc 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -3,6 +3,7 @@ package com.kakao.s2graph.core.storage.hbase
 import java.util
 
 import com.google.common.cache.Cache
+import com.kakao.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
 import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException
 import com.kakao.s2graph.core._
 import com.kakao.s2graph.core.mysqls.{Label, LabelMeta}
@@ -17,6 +18,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.hbase.async._
 
 import scala.collection.JavaConversions._
@@ -134,50 +136,67 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
 
 
   def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] = {
-    //    mutateEdgeWithOp(edge, withWait)
-    val strongConsistency = edge.label.consistencyLevel == "strong"
     val edgeFuture =
-      if (edge.op == GraphUtil.operations("delete") && !strongConsistency) {
-        val zkQuorum = edge.label.hbaseZkAddr
-        val (_, edgeUpdate) = Edge.buildDeleteBulk(None, edge)
-        val mutations =
-          mutationBuilder.indexedEdgeMutations(edgeUpdate) ++
-            mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++
-            mutationBuilder.increments(edgeUpdate)
-        writeAsyncSimple(zkQuorum, mutations, withWait)
+      if (edge.op == GraphUtil.operations("deleteAll")) {
+        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
       } else {
-        mutateEdgesInner(Seq(edge), strongConsistency, 
withWait)(Edge.buildOperation)
+        val strongConsistency = edge.label.consistencyLevel == "strong"
+        if (edge.op == GraphUtil.operations("delete") && !strongConsistency) {
+          val zkQuorum = edge.label.hbaseZkAddr
+          val (_, edgeUpdate) = Edge.buildDeleteBulk(None, edge)
+          val mutations =
+            mutationBuilder.indexedEdgeMutations(edgeUpdate) ++
+              mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++
+              mutationBuilder.increments(edgeUpdate)
+          writeAsyncSimple(zkQuorum, mutations, withWait)
+        } else {
+          mutateEdgesInner(Seq(edge), strongConsistency, 
withWait)(Edge.buildOperation)
+        }
       }
+
     val vertexFuture = writeAsyncSimple(edge.label.hbaseZkAddr,
       mutationBuilder.buildVertexPutsAsync(edge), withWait)
-    Future.sequence(Seq(edgeFuture, vertexFuture)).map { rets => 
rets.forall(identity) }
+
+    Future.sequence(Seq(edgeFuture, vertexFuture)).map(_.forall(identity))
   }
 
-  override def mutateEdges(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
-    val edgeGrouped = edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
+  override def mutateEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
+    val grouped = _edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
 
-    val ret = edgeGrouped.map { case ((label, srcId, tgtId), edges) =>
-      if (edges.isEmpty) Future.successful(true)
-      else {
-        val head = edges.head
-        val strongConsistency = head.label.consistencyLevel == "strong"
-
-        if (strongConsistency) {
-          val edgeFuture = mutateEdgesInner(edges, strongConsistency, 
withWait)(Edge.buildOperation)
-          //TODO: decide what we will do on failure on vertex put
-          val vertexFuture = writeAsyncSimple(head.label.hbaseZkAddr,
-            mutationBuilder.buildVertexPutsAsync(head), withWait)
-          Future.sequence(Seq(edgeFuture, vertexFuture)).map { rets => 
rets.forall(identity) }
-        } else {
-          Future.sequence(edges.map { edge =>
-            mutateEdge(edge, withWait = withWait)
-          }).map { rets =>
-            rets.forall(identity)
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val (deleteAllEdges, edges) = edgeGroup.partition(_.op == 
GraphUtil.operations("deleteAll"))
+
+      // DeleteAll first
+      val deleteAllFutures = deleteAllEdges.map { edge =>
+        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
+      }
+
+      // After deleteAll, process others
+      lazy val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          val strongConsistency = edges.head.label.consistencyLevel == "strong"
+          if (strongConsistency) {
+            val edgeFuture = mutateEdgesInner(edges, strongConsistency, 
withWait)(Edge.buildOperation)
+
+            //TODO: decide what we will do on failure on vertex put
+            val puts = mutationBuilder.buildVertexPutsAsync(head)
+            val vertexFuture = writeAsyncSimple(head.label.hbaseZkAddr, puts, 
withWait)
+            Seq(edgeFuture, vertexFuture)
+          } else {
+            edges.map { edge => mutateEdge(edge, withWait = withWait) }
           }
-        }
+        case Nil => Nil
       }
+
+      val composed = for {
+        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield deleteRet ++ mutateRet
+
+      composed.map(_.forall(identity))
     }
-    Future.sequence(ret)
+
+    Future.sequence(mutateEdges)
   }
 
   def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = {
@@ -191,7 +210,8 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     }
   }
 
-  def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = {
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = {
+    val _client = if (withWait) clientWithFlush else client
     val defers: Seq[Deferred[(Boolean, Long)]] = for {
       edge <- edges
     } yield {
@@ -200,12 +220,14 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
       val countVal = countWithTs.innerVal.toString().toLong
       val incr = mutationBuilder.buildIncrementsCountAsync(edgeWithIndex, 
countVal).head
       val request = incr.asInstanceOf[AtomicIncrementRequest]
-      client.bufferAtomicIncrement(request) withCallback { resultCount: 
java.lang.Long =>
+      val defer = _client.bufferAtomicIncrement(request) withCallback { 
resultCount: java.lang.Long =>
         (true, resultCount.longValue())
       } recoverWith { ex =>
         logger.error(s"mutation failed. $request", ex)
         (false, -1L)
       }
+      if (withWait) defer
+      else Deferred.fromResult((true, -1L))
     }
 
     val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = 
Deferred.groupInOrder(defers)
@@ -306,7 +328,7 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     logger.debug(msg)
   }
 
-  private def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge) = {
+  private def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: 
Option[KeyValue]) = {
     val currentTs = System.currentTimeMillis()
     val lockTs = snapshotEdgeOpt match {
       case None => Option(currentTs)
@@ -316,7 +338,8 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
           case Some(pendingEdge) => pendingEdge.lockTs
         }
     }
-    val newVersion = snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
+    val newVersion = kvOpt.map(_.timestamp()).getOrElse(edge.ts) + 1
+    //      snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
     val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = 
lockTs)
     val base = snapshotEdgeOpt match {
       case None =>
@@ -329,7 +352,8 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     base.copy(version = newVersion, statusCode = 1, lockTs = None)
   }
 
-  private def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: 
SnapshotEdge, edgeMutate: EdgeMutate) = {
+  private def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: 
SnapshotEdge,
+                                   edgeMutate: EdgeMutate) = {
     val newVersion = lockEdge.version + 1
     val base = edgeMutate.newSnapshotEdge match {
       case None =>
@@ -474,9 +498,9 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
                                              edgeUpdate: EdgeMutate): 
Future[Boolean] = {
     val label = edge.label
     def oldBytes = kvOpt.map(_.value()).getOrElse(Array.empty)
-//    def oldBytes = snapshotEdgeOpt.map { e =>
-//      snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head.value
-//    }.getOrElse(Array.empty)
+    //    def oldBytes = snapshotEdgeOpt.map { e =>
+    //      snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head.value
+    //    }.getOrElse(Array.empty)
     def process(lockEdge: SnapshotEdge,
                 releaseLockEdge: SnapshotEdge,
                 _edgeMutate: EdgeMutate,
@@ -493,7 +517,7 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     }
 
 
-    val lockEdge = buildLockEdge(snapshotEdgeOpt, edge)
+    val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt)
     val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, 
edgeUpdate)
     snapshotEdgeOpt match {
       case None =>
@@ -510,8 +534,13 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
             if (isLockExpired) {
               val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
               val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(pendingEdge))
-              val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge)
-              val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
newLockEdge, newEdgeUpdate)
+              val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, 
kvOpt)
+              val _newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
newLockEdge, newEdgeUpdate)
+
+              // set lock ts as current ts
+              val newPendingEdgeOpt = 
_newReleaseLockEdge.pendingEdgeOpt.map(_.copy(lockTs = 
Option(System.currentTimeMillis())))
+              val newReleaseLockEdge = _newReleaseLockEdge.copy(pendingEdgeOpt 
= newPendingEdgeOpt)
+
               process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, 
statusCode = 0).flatMap { ret =>
                 val log = s"[Success]: Resolving expired pending 
edge.\n${pendingEdge.toLogString}"
                 throw new PartialFailureException(edge, 0, log)
@@ -621,10 +650,10 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
       "----------------------------------------------").mkString("\n")
   }
 
-  private def deleteAllFetchedEdgesAsyncOld(queryRequestWithResult: 
QueryRequestWithResult,
+  private def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
+                                            queryResult: QueryResult,
                                             requestTs: Long,
                                             retryNum: Int): Future[Boolean] = {
-    val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
     val queryParam = queryRequest.queryParam
     val zkQuorum = queryParam.label.hbaseZkAddr
     val futures = for {
@@ -669,7 +698,7 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
 
     val futures = for {
       queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
       deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
       if deleteQueryResult.edgeWithScoreLs.nonEmpty
     } yield {
@@ -681,14 +710,14 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
             * read: snapshotEdge on queryResult = O(N)
             * write: N x (relatedEdges x indices(indexedEdge) + 
1(snapshotEdge))
             */
-          mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait 
= true).map { rets => rets.forall(identity) }
+          mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait 
= true).map(_.forall(identity))
         case _ =>
 
           /**
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x 
indices)
             */
-          deleteAllFetchedEdgesAsyncOld(queryRequestWithResult, requestTs, 
MaxRetryNum)
+          deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, 
requestTs, MaxRetryNum)
       }
     }
     if (futures.isEmpty) {
@@ -715,10 +744,26 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
 
   }
 
-  def deleteAllAdjacentEdges(srcVertices: List[Vertex],
+  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
                              labels: Seq[Label],
                              dir: Int,
                              ts: Long): Future[Boolean] = {
+
+    def enqueueLogMessage() = {
+      val kafkaMessages = for {
+        vertice <- srcVertices
+        id = vertice.innerId.toIdString()
+        label <- labels
+      } yield {
+        val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
+        val topic = ExceptionHandler.failTopic
+        val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, 
tsv))
+        kafkaMsg
+      }
+
+      ExceptionHandler.enqueues(kafkaMessages)
+    }
+
     val requestTs = ts
     val queryParams = for {
       label <- labels
@@ -731,11 +776,19 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     val q = Query(srcVertices, Vector(step))
 
     //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, 
Random.nextInt(MaxBackOff) + 1) {
-    Extensions.retryOnSuccess(MaxRetryNum) {
+    val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
       fetchAndDeleteAll(q, requestTs)
     } { case (allDeleted, deleteSuccess) =>
       allDeleted && deleteSuccess
     }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
+
+    retryFuture onFailure {
+      case ex =>
+        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
+        enqueueLogMessage()
+    }
+
+    retryFuture
   }
 
   def flush(): Unit = clients.foreach { client =>
@@ -790,6 +843,7 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     val conn = ConnectionFactory.createConnection(conf)
     conn.getAdmin
   }
+
   private def enableTable(zkAddr: String, tableName: String) = {
     getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2rest_play/app/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/EdgeController.scala 
b/s2rest_play/app/controllers/EdgeController.scala
index 32af6c3..2b3b32e 100644
--- a/s2rest_play/app/controllers/EdgeController.scala
+++ b/s2rest_play/app/controllers/EdgeController.scala
@@ -24,6 +24,23 @@ object EdgeController extends Controller {
 
   private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
   private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
+  private def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
+  }
+
+  def toTsv(jsValue: JsValue, op: String): String = {
+    val ts = jsToStr(jsValue \ "timestamp")
+    val from = jsToStr(jsValue \ "from")
+    val to = jsToStr(jsValue \ "to")
+    val label = jsToStr(jsValue \ "label")
+    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
+
+    (jsValue \ "direction").asOpt[String] match {
+      case None => Seq(ts, op, "e", from, to, label, props).mkString("\t")
+      case Some(dir) => Seq(ts, op, "e", from, to, label, props, 
dir).mkString("\t")
+    }
+  }
 
   def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = 
false): Future[Result] = {
     if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
@@ -31,12 +48,13 @@ object EdgeController extends Controller {
     else {
       try {
         logger.debug(s"$jsValue")
-        val edges = requestParser.toEdges(jsValue, operation)
-        for (edge <- edges) {
+        val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
+
+        for ((edge, orgJs) <- edges.zip(jsOrgs)) {
           if (edge.isAsync)
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, 
None))
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, 
Option(toTsv(orgJs, operation))))
           else
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
edge, None))
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
edge, Option(toTsv(orgJs, operation))))
         }
 
         val edgesToStore = edges.filterNot(e => e.isAsync)
@@ -89,8 +107,6 @@ object EdgeController extends Controller {
         val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
         Future.successful(jsonResponse(Json.toJson(rets)))
       }
-
-
     } catch {
       case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
       case e: Throwable =>
@@ -100,7 +116,11 @@ object EdgeController extends Controller {
   }
 
   def mutateBulk() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body)
+    mutateAndPublish(request.body, withWait = false)
+  }
+
+  def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
+    mutateAndPublish(request.body, withWait = true)
   }
 
   def inserts() = withHeaderAsync(jsonParser) { request =>
@@ -135,10 +155,15 @@ object EdgeController extends Controller {
     tryMutates(request.body, "increment")
   }
 
+  def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "increment", withWait = true)
+  }
+
   def incrementCounts() = withHeaderAsync(jsonParser) { request =>
     val jsValue = request.body
     val edges = requestParser.toEdges(jsValue, "incrementCount")
-    s2.incrementCounts(edges).map { results =>
+
+    s2.incrementCounts(edges, withWait = true).map { results =>
       val json = results.map { case (isSuccess, resultCount) =>
         Json.obj("success" -> isSuccess, "result" -> resultCount)
       }
@@ -148,72 +173,47 @@ object EdgeController extends Controller {
   }
 
   def deleteAll() = withHeaderAsync(jsonParser) { request =>
-    deleteAllInner(request.body, withWait = false)
+//    deleteAllInner(request.body, withWait = false)
+    deleteAllInner(request.body, withWait = true)
   }
 
   def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
-    val deleteResults = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
-
-      val labelName = (json \ "label").as[String]
-      val labels = Label.findByName(labelName).map { l => Seq(l) 
}.getOrElse(Nil)
-      val direction = (json \ "direction").asOpt[String].getOrElse("out")
-
-      val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
-      val ts = (json \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
-      val vertices = requestParser.toVertices(labelName, direction, ids)
-
-      /** logging for delete all request */
 
+    /** logging for delete all request */
+    def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, 
direction: String, topicOpt: Option[String]) = {
       val kafkaMessages = for {
         id <- ids
         label <- labels
       } yield {
-          val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
direction).mkString("\t")
-          val topic = if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
-          val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
-          kafkaMsg
+        val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), 
label.label, "{}", direction).mkString("\t")
+        val topic = topicOpt.getOrElse {
+          if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
         }
+
+        val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, 
tsv))
+        kafkaMsg
+      }
+
       ExceptionHandler.enqueues(kafkaMessages)
+    }
 
+    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], 
ts: Long, vertices: Seq[Vertex]) = {
+      enqueueLogMessage(ids, labels, ts, direction, None)
       val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
-      future.onFailure { case ex: Exception =>
-        logger.error(s"[Error]: deleteAllInner failed.", ex)
-        val kafkaMessages = for {
-          id <- ids
-          label <- labels
-        } yield {
-            val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
direction).mkString("\t")
-            val topic = failTopic
-            val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
-            kafkaMsg
-          }
-        ExceptionHandler.enqueues(kafkaMessages)
-        throw ex
-      }
       if (withWait) {
-        future.map { ret =>
-          if (!ret) {
-            logger.error(s"[Error]: deleteAllInner failed.")
-            val kafkaMessages = for {
-              id <- ids
-              label <- labels
-            } yield {
-                val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
direction).mkString("\t")
-                val topic = failTopic
-                val kafkaMsg = KafkaMessage(new ProducerRecord[Key, 
Val](topic, null, tsv))
-                kafkaMsg
-              }
-            ExceptionHandler.enqueues(kafkaMessages)
-            false
-          } else {
-            true
-          }
-        }
+        future
       } else {
         Future.successful(true)
       }
-    })
+    }
+
+    val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
+      val (labels, direction, ids, ts, vertices) = 
requestParser.toDeleteParam(json)
+      if (labels.isEmpty || ids.isEmpty) Future.successful(true)
+      else deleteEach(labels, direction, ids, ts, vertices)
+    }
 
+    val deleteResults = Future.sequence(deleteFutures)
     deleteResults.map { rst =>
       logger.debug(s"deleteAllInner: $rst")
       Ok(s"deleted... ${rst.toString()}")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8ec7408/s2rest_play/conf/routes
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes
index df4a1ee..90838c8 100644
--- a/s2rest_play/conf/routes
+++ b/s2rest_play/conf/routes
@@ -23,8 +23,10 @@ POST        /graphs/edges/deleteAll                          
             contro
 POST        /graphs/edges/update                                          
controllers.EdgeController.updates()
 POST        /graphs/edges/updateWithWait                                  
controllers.EdgeController.updatesWithWait()
 POST        /graphs/edges/increment                                       
controllers.EdgeController.increments()
+POST        /graphs/edges/incrementWithWait                               
controllers.EdgeController.incrementsWithWait()
 POST        /graphs/edges/incrementCount                                  
controllers.EdgeController.incrementCounts()
 POST        /graphs/edges/bulk                                            
controllers.EdgeController.mutateBulk()
+POST        /graphs/edges/bulkWithWait                                    
controllers.EdgeController.mutateBulkWithWait()
 
 ## Vertex
 POST        /graphs/vertices/insert                                       
controllers.VertexController.inserts()

Reply via email to