Repository: incubator-s2graph
Updated Branches:
  refs/heads/master a1d91ee02 -> cf06febe5


[S2GRAPH-68]: Refactor write-write conflict resolving logic.
    more comments and simplified code.
    remove excessive re-fetch on every retry from partial failure.
    remove Thread.sleep between retry from partial failure.

JIRA:
  [S2GRAPH-68] https://issues.apache.org/jira/browse/S2GRAPH-68

Pull Request:
  Closes #53

Authors:
  DOYUNG YOON: [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cf06febe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cf06febe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cf06febe

Branch: refs/heads/master
Commit: cf06febe595bac3ff71fdb93289b3e545520a26e
Parents: a1d91ee
Author: DO YUNG YOON <[email protected]>
Authored: Fri Jun 17 10:08:17 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Jun 17 10:08:17 2016 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/core/Graph.scala   |   4 +-
 .../apache/s2graph/core/storage/Storage.scala   | 712 +++++++++++--------
 s2rest_play/conf/test.conf                      |   2 +-
 3 files changed, 405 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cf06febe/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index ac1d4c1..f4ce7b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -55,12 +55,14 @@ object Graph {
     "max.retry.number" -> java.lang.Integer.valueOf(100),
     "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
     "max.back.off" -> java.lang.Integer.valueOf(100),
+    "back.off.timeout" -> java.lang.Integer.valueOf(1000),
     "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
     "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
     "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
     "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
     "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
-    "s2graph.storage.backend" -> "hbase"
+    "s2graph.storage.backend" -> "hbase",
+    "query.hardlimit" -> java.lang.Integer.valueOf(100000)
   )
 
   var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cf06febe/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 92a93bd..e52c579 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,6 +19,8 @@
 
 package org.apache.s2graph.core.storage
 
+import java.util.concurrent.{TimeUnit, Executors}
+
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -35,7 +37,7 @@ import org.apache.s2graph.core.utils.{Extensions, logger}
 import scala.annotation.tailrec
 import scala.collection.Seq
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{Promise, ExecutionContext, Future}
 import scala.util.{Random, Try}
 
 abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
@@ -44,12 +46,18 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
   /** storage dependent configurations */
   val MaxRetryNum = config.getInt("max.retry.number")
   val MaxBackOff = config.getInt("max.back.off")
+  val BackoffTimeout = config.getInt("back.off.timeout")
   val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
   val FailProb = config.getDouble("hbase.fail.prob")
-  val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000)
-  val maxSize = config.getInt("future.cache.max.size")
-  val expireAfterWrite = config.getInt("future.cache.expire.after.write")
-  val expireAfterAccess = config.getInt("future.cache.expire.after.access")
+  val LockExpireDuration = config.getInt("lock.expire.time")
+  val MaxSize = config.getInt("future.cache.max.size")
+  val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
+  val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
+
+  /** retry scheduler */
+  val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
+
+  val failTopic = s"mutateFailed_${config.getString("phase")}"
 
   /**
    * Compatibility table
@@ -348,7 +356,7 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
         case head :: tail =>
           //          val strongConsistency = 
edges.head.label.consistencyLevel == "strong"
           //          if (strongConsistency) {
-          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , 
withWait)(Edge.buildOperation)
+          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , 
withWait)
 
           //TODO: decide what we will do on failure on vertex put
           val puts = buildVertexPutsAsync(head)
@@ -392,81 +400,416 @@ abstract class Storage[R](val config: Config)(implicit 
ec: ExecutionContext) {
 
   def mutateEdgesInner(edges: Seq[Edge],
                        checkConsistency: Boolean,
-                       withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => 
(Edge, EdgeMutate)): Future[Boolean] = {
+                       withWait: Boolean): Future[Boolean] = {
+    assert(edges.nonEmpty)
     if (!checkConsistency) {
       val zkQuorum = edges.head.label.hbaseZkAddr
       val futures = edges.map { edge =>
-        val (_, edgeUpdate) = f(None, Seq(edge))
-        val mutations =
-          indexedEdgeMutations(edgeUpdate) ++
-            snapshotEdgeMutations(edgeUpdate) ++
-            increments(edgeUpdate)
+        val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge))
+        val mutations = indexedEdgeMutations(edgeUpdate) ++ 
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
         writeToStorage(zkQuorum, mutations, withWait)
       }
       Future.sequence(futures).map { rets => rets.forall(identity) }
     } else {
-      def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = {
+      fetchSnapshotEdge(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+        retry(1)(edges, 0, snapshotEdgeOpt)
+      }
+    }
+  }
 
-        fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+  def exponentialBackOff(tryNum: Int) = {
+    // time slot is divided by 10 ms
+    val slot = 10
+    Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
+  }
 
-          val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges)
-          logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}")
-          //shouldReplace false.
-          if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) {
-            logger.debug(s"${newEdge.toLogString} drop.")
-            Future.successful(true)
-          } else {
-            commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, 
edgeUpdate).map { ret =>
-              if (ret) {
-                logger.info(s"[Success] commit: 
\n${_edges.map(_.toLogString).mkString("\n")}")
+  def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte, 
fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = {
+    if (tryNum >= MaxRetryNum) {
+      edges.foreach { edge =>
+        logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
+
+        val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element 
= edge)
+        ExceptionHandler.enqueue(kafkaMessage)
+      }
+
+      Future.successful(false)
+    } else {
+      val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt)
+      future.onSuccess {
+        case success =>
+          logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
+      }
+      future recoverWith {
+        case FetchTimeoutException(retryEdge) =>
+          logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
+          /** fetch failed. re-fetch should be done */
+          fetchSnapshotEdge(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+            retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
+          }
+
+
+        case PartialFailureException(retryEdge, failedStatusCode, faileReason) 
=>
+          val status = failedStatusCode match {
+            case 0 => "AcquireLock failed."
+            case 1 => "Mutation failed."
+            case 2 => "Increment failed."
+            case 3 => "ReleaseLock failed."
+            case 4 => "Unknown"
+          }
+          logger.info(s"[Try: $tryNum], [Status: $status] partial 
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
+
+          /** retry logic */
+          val promise = Promise[Boolean]
+          val backOff = exponentialBackOff(tryNum)
+          scheduledThreadPool.schedule(new Runnable {
+            override def run(): Unit = {
+              val future = if (failedStatusCode == 0) {
+                // acquire Lock failed. other is mutating so this thead need 
to re-fetch snapshotEdge.
+                /** fetch failed. re-fetch should be done */
+                fetchSnapshotEdge(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+                  retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
+                }
               } else {
-                throw new PartialFailureException(newEdge, 3, "commit failed.")
+                // partial failure occur while self locked and mutating.
+                //            assert(fetchedSnapshotEdgeOpt.nonEmpty)
+                retry(tryNum + 1)(edges, failedStatusCode, 
fetchedSnapshotEdgeOpt)
               }
-              true
+              promise.completeWith(future)
             }
+
+          }, backOff, TimeUnit.MILLISECONDS)
+          promise.future
+
+        case ex: Exception =>
+          logger.error("Unknown exception", ex)
+          Future.successful(false)
+      }
+    }
+  }
+
+  protected def commitUpdate(edges: Seq[Edge],
+                             statusCode: Byte,
+                             fetchedSnapshotEdgeOpt: Option[Edge]): 
Future[Boolean] = {
+//    Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
+    assert(edges.nonEmpty)
+//    assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
+
+    statusCode match {
+      case 0 =>
+        fetchedSnapshotEdgeOpt match {
+          case None =>
+          /**
+           * no one has never mutated this SN.
+           * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
+           * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, 
version = squashedEdge.ts + 1)
+           * lock = (squashedEdge, pendingE)
+           * releaseLock = (edgeMutate.newSnapshotEdge, None)
+           */
+            val (squashedEdge, edgeMutate) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+
+            assert(edgeMutate.newSnapshotEdge.isDefined)
+
+            val lockTs = Option(System.currentTimeMillis())
+            val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = 
lockTs, version = squashedEdge.ts + 1)
+            val lockSnapshotEdge = 
squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+            val releaseLockSnapshotEdge = 
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+              pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
+
+            commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, 
lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+
+          case Some(snapshotEdge) =>
+            snapshotEdge.pendingEdgeOpt match {
+              case None =>
+                /**
+                 * others finished commit on this SN. but there is no 
contention.
+                 * (squashedEdge, edgeMutate) = 
Edge.buildOperation(snapshotEdgeOpt, edges)
+                 * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, 
version = snapshotEdge.version + 1) ?
+                 * lock = (snapshotEdge, pendingE)
+                 * releaseLock = (edgeMutate.newSnapshotEdge, None)
+                 */
+                val (squashedEdge, edgeMutate) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+                if (edgeMutate.newSnapshotEdge.isEmpty) {
+                  logger.debug(s"drop this requests: 
\n${edges.map(_.toLogString).mkString("\n")}")
+                  Future.successful(true)
+                } else {
+                  val lockTs = Option(System.currentTimeMillis())
+                  val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = 
lockTs, version = snapshotEdge.version + 1)
+                  val lockSnapshotEdge = 
snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+                  val releaseLockSnapshotEdge = 
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+                    pendingEdgeOpt = None, version = lockSnapshotEdge.version 
+ 1)
+                  commitProcess(statusCode, squashedEdge, 
fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+                }
+              case Some(pendingEdge) =>
+                val isLockExpired = pendingEdge.lockTs.get + 
LockExpireDuration < System.currentTimeMillis()
+                if (isLockExpired) {
+                  /**
+                   * if pendingEdge.ts == snapshotEdge.ts =>
+                   *    (squashedEdge, edgeMutate) = Edge.buildOperation(None, 
Seq(pendingEdge))
+                   * else =>
+                   *    (squashedEdge, edgeMutate) = 
Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge))
+                   * pendingE = squashedEdge.copy(statusCode = 1, lockTs = 
now, version = snapshotEdge.version + 1)
+                   * lock = (snapshotEdge, pendingE)
+                   * releaseLock = (edgeMutate.newSnapshotEdge, None)
+                   */
+                  logger.debug(s"${pendingEdge.toLogString} has been expired.")
+                  val (squashedEdge, edgeMutate) =
+                    if (pendingEdge.ts == snapshotEdge.ts) 
Edge.buildOperation(None, pendingEdge +: edges)
+                    else Edge.buildOperation(fetchedSnapshotEdgeOpt, 
pendingEdge +: edges)
+
+                  val lockTs = Option(System.currentTimeMillis())
+                  val newPendingEdge = squashedEdge.copy(statusCode = 1, 
lockTs = lockTs, version = snapshotEdge.version + 1)
+                  val lockSnapshotEdge = 
snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
+                  val releaseLockSnapshotEdge = 
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+                    pendingEdgeOpt = None, version = lockSnapshotEdge.version 
+ 1)
+
+                  commitProcess(statusCode, squashedEdge, 
fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+                } else {
+                  /**
+                   * others finished commit on this SN and there is currently 
contention.
+                   * this can't be proceed so retry from re-fetch.
+                   * throw EX
+                   */
+                  val (squashedEdge, _) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+                  Future.failed(new PartialFailureException(squashedEdge, 0, 
s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
+                }
+            }
+
+        }
+      case _ =>
+
+        /**
+         * statusCode > 0 which means self locked and there has been partial 
failure either on mutate, increment, releaseLock
+         */
+
+        /**
+         * this succeed to lock this SN. keep doing on commit process.
+         * if SN.isEmpty =>
+         * no one never succed to commit on this SN.
+         * this is first mutation try on this SN.
+         * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
+         * else =>
+         * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when 
self retrying.
+         * there has been success commit on this SN.
+         * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
+         * releaseLock = (edgeMutate.newSnapshotEdge, None)
+         */
+        val _edges =
+          if (fetchedSnapshotEdgeOpt.isDefined && 
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) 
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
+          else edges
+        val (squashedEdge, edgeMutate) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
+        val newVersion = 
fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
+        val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
+          case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, 
pendingEdgeOpt = None, version = newVersion)
+          case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, 
pendingEdgeOpt = None, version = newVersion)
+        }
+        // lockSnapshotEdge will be ignored.
+        commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, 
releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+    }
+  }
+  /**
+   * orchestrate commit process.
+   * we separate into 4 step to avoid duplicating each step over and over.
+   * @param statusCode: current statusCode of this thread to process edges.
+   * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge.
+   * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before 
commit process begin.
+   * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this 
snapshotEdge for this thread.
+   * @param releaseLockSnapshotEdge: releaseLockEdge that will remove lock by 
storing new final merged states
+   *                               all from current request edges and fetched 
snapshotEdge.
+   * @param edgeMutate: mutations for indexEdge and snapshotEdge.
+   * @return
+   */
+  protected def commitProcess(statusCode: Byte,
+                              squashedEdge: Edge,
+                              fetchedSnapshotEdgeOpt:Option[Edge],
+                              lockSnapshotEdge: SnapshotEdge,
+                              releaseLockSnapshotEdge: SnapshotEdge,
+                              edgeMutate: EdgeMutate): Future[Boolean] = {
+    for {
+      locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, 
lockSnapshotEdge)
+      mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge, 
edgeMutate)
+      incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode, 
squashedEdge, edgeMutate)
+      lockReleased <- releaseLock(incremented, statusCode, squashedEdge, 
releaseLockSnapshotEdge)
+    } yield lockReleased
+  }
+
+  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends Exception
+
+  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
+    val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
+    logger.debug(msg)
+  }
+
+  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, 
edgeMutate: EdgeMutate) = {
+    val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
+      s"${edgeMutate.toLogString}").mkString("\n")
+    logger.debug(msg)
+  }
+
+  /**
+   * try to acquire lock on storage for this given snapshotEdge(lockEdge).
+   * @param statusCode: current statusCode of this thread to process edges.
+   * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
+   * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage.
+   * @param lockEdge: lockEdge to build RPC request(compareAndSet) into 
Storage.
+   * @return
+   */
+  protected def acquireLock(statusCode: Byte,
+                            squashedEdge: Edge,
+                            fetchedSnapshotEdgeOpt: Option[Edge],
+                            lockEdge: SnapshotEdge): Future[Boolean] = {
+    if (statusCode >= 1) {
+      logger.debug(s"skip acquireLock: 
[$statusCode]\n${squashedEdge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) {
+        Future.failed(new PartialFailureException(squashedEdge, 0, s"$p"))
+      } else {
+        val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
+        val oldPut = fetchedSnapshotEdgeOpt.map(e => 
snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
+        writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
+          logger.error(s"AcquireLock RPC Failed.")
+          throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC 
Failed")
+        }.map { ret =>
+          if (ret) {
+            val log = Seq(
+              "\n",
+              "=" * 50,
+              s"[Success]: acquireLock",
+              s"[RequestEdge]: ${squashedEdge.toLogString}",
+              s"[LockEdge]: ${lockEdge.toLogString()}",
+              s"[PendingEdge]: 
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
+              "=" * 50, "\n").mkString("\n")
+
+            logger.debug(log)
+            //            debug(ret, "acquireLock", edge.toSnapshotEdge)
+          } else {
+            throw new PartialFailureException(squashedEdge, 0, "hbase fail.")
           }
+          true
         }
       }
-      def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: 
(Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = {
-        if (tryNum >= MaxRetryNum) {
-          edges.foreach { edge =>
-            logger.error(s"commit failed after 
$MaxRetryNum\n${edge.toLogString}")
-            ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = 
edge))
+    }
+  }
+
+
+  /**
+   * change this snapshot's state on storage from locked into committed by
+   * storing new merged states on storage. merge state come from 
releaseLockEdge.
+   * note that releaseLock return Future.failed on predicate failure.
+   * @param predicate: indicate if this releaseLock phase should be proceed or 
not.
+   * @param statusCode: releaseLock do not use statusCode, only for debug.
+   * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
+   * @param releaseLockEdge: final merged states if all process goes well.
+   * @return
+   */
+  protected def releaseLock(predicate: Boolean,
+                            statusCode: Byte,
+                            squashedEdge: Edge,
+                            releaseLockEdge: SnapshotEdge): Future[Boolean] = {
+    if (!predicate) {
+      Future.failed(new PartialFailureException(squashedEdge, 3, "predicate 
failed."))
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 3, s"$p"))
+      else {
+        val releaseLockEdgePuts = 
snapshotEdgeSerializer(releaseLockEdge).toKeyValues
+        writeToStorage(squashedEdge.label.hbaseZkAddr, releaseLockEdgePuts, 
withWait = true).recoverWith {
+          case ex: Exception =>
+            logger.error(s"ReleaseLock RPC Failed.")
+            throw new PartialFailureException(squashedEdge, 3, "ReleaseLock 
RPC Failed")
+        }.map { ret =>
+          if (ret) {
+            debug(ret, "releaseLock", squashedEdge.toSnapshotEdge)
+          } else {
+            val msg = Seq("\nFATAL ERROR\n",
+              "=" * 50,
+              squashedEdge.toLogString,
+              releaseLockEdgePuts,
+              "=" * 50,
+              "\n"
+            )
+            logger.error(msg.mkString("\n"))
+            //          error(ret, "releaseLock", edge.toSnapshotEdge)
+            throw new PartialFailureException(squashedEdge, 3, "hbase fail.")
           }
-          Future.successful(false)
-        } else {
-          val future = fn(edges, statusCode)
-          future.onSuccess {
-            case success =>
-              logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
+          true
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param predicate: indicate if this commitIndexEdgeMutations phase should 
be proceed or not.
+   * @param statusCode: current statusCode of this thread to process edges.
+   * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
+   * @param edgeMutate: actual collection of mutations. note that edgeMutate 
contains snapshotEdge mutations,
+   *                  but in here, we only use indexEdge's mutations.
+   * @return
+   */
+  protected def commitIndexEdgeMutations(predicate: Boolean,
+                       statusCode: Byte,
+                       squashedEdge: Edge,
+                       edgeMutate: EdgeMutate): Future[Boolean] = {
+    if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, 
"predicate failed."))
+    else {
+      if (statusCode >= 2) {
+        logger.debug(s"skip mutate: 
[$statusCode]\n${squashedEdge.toLogString}")
+        Future.successful(true)
+      } else {
+        val p = Random.nextDouble()
+        if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 1, s"$p"))
+        else
+          writeToStorage(squashedEdge.label.hbaseZkAddr, 
indexedEdgeMutations(edgeMutate), withWait = true).map { ret =>
+            if (ret) {
+              debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate)
+            } else {
+              throw new PartialFailureException(squashedEdge, 1, "hbase fail.")
+            }
+            true
           }
-          future recoverWith {
-            case FetchTimeoutException(retryEdge) =>
-              logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
-              retry(tryNum + 1)(edges, statusCode)(fn)
-
-            case PartialFailureException(retryEdge, failedStatusCode, 
faileReason) =>
-              val status = failedStatusCode match {
-                case 0 => "AcquireLock failed."
-                case 1 => "Mutation failed."
-                case 2 => "Increment failed."
-                case 3 => "ReleaseLock failed."
-                case 4 => "Unknown"
-              }
+      }
+    }
+  }
 
-              Thread.sleep(Random.nextInt(MaxBackOff))
-              logger.info(s"[Try: $tryNum], [Status: $status] partial 
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
-              retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn)
-            case ex: Exception =>
-              logger.error("Unknown exception", ex)
-              Future.successful(false)
+  /**
+   *
+   * @param predicate: indicate if this commitIndexEdgeMutations phase should 
be proceed or not.
+   * @param statusCode: current statusCode of this thread to process edges.
+   * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
+   * @param edgeMutate: actual collection of mutations. note that edgeMutate 
contains snapshotEdge mutations,
+   *                  but in here, we only use indexEdge's degree mutations.
+   * @return
+   */
+  protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
+                          statusCode: Byte,
+                          squashedEdge: Edge,
+                          edgeMutate: EdgeMutate): Future[Boolean] = {
+    if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, 
"predicate failed."))
+    if (statusCode >= 3) {
+      logger.debug(s"skip increment: 
[$statusCode]\n${squashedEdge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 2, s"$p"))
+      else
+        writeToStorage(squashedEdge.label.hbaseZkAddr, increments(edgeMutate), 
withWait = true).map { ret =>
+          if (ret) {
+            debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate)
+          } else {
+            throw new PartialFailureException(squashedEdge, 2, "hbase fail.")
           }
+          true
         }
-      }
-      retry(1)(edges, 0)(commit)
     }
   }
 
+
+
+
+  /** end of methods for consistency */
+
   def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
                 newEdge: Edge, edgeMutate: EdgeMutate) =
     Seq("----------------------------------------------",
@@ -733,259 +1076,6 @@ abstract class Storage[R](val config: Config)(implicit 
ec: ExecutionContext) {
 //    }
 //  }
 
-  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends Exception
-
-  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
-    val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
-    logger.debug(msg)
-  }
-
-  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, 
edgeMutate: EdgeMutate) = {
-    val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
-      s"${edgeMutate.toLogString}").mkString("\n")
-    logger.debug(msg)
-  }
-
-  protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, 
kvOpt: Option[SKeyValue]) = {
-    val currentTs = System.currentTimeMillis()
-    val lockTs = snapshotEdgeOpt match {
-      case None => Option(currentTs)
-      case Some(snapshotEdge) =>
-        snapshotEdge.pendingEdgeOpt match {
-          case None => Option(currentTs)
-          case Some(pendingEdge) => pendingEdge.lockTs
-        }
-    }
-    val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1
-    //      snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
-    val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = 
lockTs)
-    val base = snapshotEdgeOpt match {
-      case None =>
-        // no one ever mutated on this snapshotEdge.
-        edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
-      case Some(snapshotEdge) =>
-        // there is at least one mutation have been succeed.
-        snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = 
Option(pendingEdge))
-    }
-    base.copy(version = newVersion, statusCode = 1, lockTs = None)
-  }
-
-  protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: 
SnapshotEdge,
-                                     edgeMutate: EdgeMutate) = {
-    val newVersion = lockEdge.version + 1
-    val base = edgeMutate.newSnapshotEdge match {
-      case None =>
-        // shouldReplace false
-        assert(snapshotEdgeOpt.isDefined)
-        snapshotEdgeOpt.get.toSnapshotEdge
-      case Some(newSnapshotEdge) => newSnapshotEdge
-    }
-    base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None)
-  }
-
-  protected def acquireLock(statusCode: Byte,
-                            edge: Edge,
-                            oldSnapshotEdgeOpt: Option[Edge],
-                            lockEdge: SnapshotEdge,
-                            oldBytes: Array[Byte]): Future[Boolean] = {
-    if (statusCode >= 1) {
-      logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p")
-      else {
-        val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
-        val oldPut = oldSnapshotEdgeOpt.map(e => 
snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
-//        val lockEdgePut = buildPutAsync(lockEdge).head
-//        val oldPut = oldSnapshotEdgeOpt.map(e => 
buildPutAsync(e.toSnapshotEdge).head)
-        writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
-          logger.error(s"AcquireLock RPC Failed.")
-          throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed")
-        }.map { ret =>
-          if (ret) {
-            val log = Seq(
-              "\n",
-              "=" * 50,
-              s"[Success]: acquireLock",
-              s"[RequestEdge]: ${edge.toLogString}",
-              s"[LockEdge]: ${lockEdge.toLogString()}",
-              s"[PendingEdge]: 
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
-              "=" * 50, "\n").mkString("\n")
-
-            logger.debug(log)
-            //            debug(ret, "acquireLock", edge.toSnapshotEdge)
-          } else {
-            throw new PartialFailureException(edge, 0, "hbase fail.")
-          }
-          true
-        }
-      }
-    }
-  }
-
-
-
-  protected def releaseLock(predicate: Boolean,
-                            edge: Edge,
-                            lockEdge: SnapshotEdge,
-                            releaseLockEdge: SnapshotEdge,
-                            _edgeMutate: EdgeMutate,
-                            oldBytes: Array[Byte]): Future[Boolean] = {
-    if (!predicate) {
-      throw new PartialFailureException(edge, 3, "predicate failed.")
-    }
-    val p = Random.nextDouble()
-    if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p")
-    else {
-      val releaseLockEdgePut = 
snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head
-      val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
-      writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith {
-        case ex: Exception =>
-          logger.error(s"ReleaseLock RPC Failed.")
-          throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed")
-      }.map { ret =>
-        if (ret) {
-          debug(ret, "releaseLock", edge.toSnapshotEdge)
-        } else {
-          val msg = Seq("\nFATAL ERROR\n",
-            "=" * 50,
-            oldBytes.toList,
-            lockEdgePut,
-            releaseLockEdgePut,
-            //            lockEdgePut.value.toList,
-            //            releaseLockEdgePut.value().toList,
-            "=" * 50,
-            "\n"
-          )
-          logger.error(msg.mkString("\n"))
-          //          error(ret, "releaseLock", edge.toSnapshotEdge)
-          throw new PartialFailureException(edge, 3, "hbase fail.")
-        }
-        true
-      }
-    }
-    Future.successful(true)
-  }
-
-
-  protected def mutate(predicate: Boolean,
-                       edge: Edge,
-                       statusCode: Byte,
-                       _edgeMutate: EdgeMutate): Future[Boolean] = {
-    if (!predicate) throw new PartialFailureException(edge, 1, "predicate 
failed.")
-
-    if (statusCode >= 2) {
-      logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p")
-      else
-        writeToStorage(edge.label.hbaseZkAddr, 
indexedEdgeMutations(_edgeMutate), withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate)
-          } else {
-            throw new PartialFailureException(edge, 1, "hbase fail.")
-          }
-          true
-        }
-    }
-  }
-
-  protected def increment(predicate: Boolean,
-                          edge: Edge,
-                          statusCode: Byte, _edgeMutate: EdgeMutate): 
Future[Boolean] = {
-    if (!predicate) throw new PartialFailureException(edge, 2, "predicate 
failed.")
-    if (statusCode >= 3) {
-      logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p")
-      else
-        writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate), 
withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate)
-          } else {
-            throw new PartialFailureException(edge, 2, "hbase fail.")
-          }
-          true
-        }
-    }
-  }
-
-
-  /** this may be overrided by specific storage implementation */
-  protected def commitProcess(edge: Edge, statusCode: Byte)
-                             (snapshotEdgeOpt: Option[Edge], kvOpt: 
Option[SKeyValue])
-                             (lockEdge: SnapshotEdge, releaseLockEdge: 
SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = {
-    val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte])
-    for {
-      locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge, 
oldBytes)
-      mutated <- mutate(locked, edge, statusCode, _edgeMutate)
-      incremented <- increment(mutated, edge, statusCode, _edgeMutate)
-      released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, 
_edgeMutate, oldBytes)
-    } yield {
-      released
-    }
-  }
-
-  protected def commitUpdate(edge: Edge,
-                             statusCode: Byte)(snapshotEdgeOpt: Option[Edge],
-                                               kvOpt: Option[SKeyValue],
-                                               edgeUpdate: EdgeMutate): 
Future[Boolean] = {
-    val label = edge.label
-    def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty)
-
-    val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt)
-    val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, 
edgeUpdate)
-    val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_
-    snapshotEdgeOpt match {
-      case None =>
-        // no one ever did success on acquire lock.
-        _process(lockEdge, releaseLockEdge, edgeUpdate)
-      //        process(lockEdge, releaseLockEdge, edgeUpdate, statusCode)
-      case Some(snapshotEdge) =>
-        // someone did success on acquire lock at least one.
-        snapshotEdge.pendingEdgeOpt match {
-          case None =>
-            // not locked
-            _process(lockEdge, releaseLockEdge, edgeUpdate)
-          //            process(lockEdge, releaseLockEdge, edgeUpdate, 
statusCode)
-          case Some(pendingEdge) =>
-            def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < 
System.currentTimeMillis()
-            if (isLockExpired) {
-              val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
-              val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(pendingEdge))
-              val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, 
kvOpt)
-              val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
newLockEdge, newEdgeUpdate)
-              commitProcess(edge, statusCode = 0)(snapshotEdgeOpt, 
kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret =>
-                //              process(newLockEdge, newReleaseLockEdge, 
newEdgeUpdate, statusCode = 0).flatMap { ret =>
-                val log = s"[Success]: Resolving expired pending 
edge.\n${pendingEdge.toLogString}"
-                throw new PartialFailureException(edge, 0, log)
-              }
-            } else {
-              // locked
-              if (pendingEdge.ts == edge.ts && statusCode > 0) {
-                // self locked
-                val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
-                val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(edge))
-                val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
lockEdge, newEdgeUpdate)
-
-                /** lockEdge will be ignored */
-                _process(lockEdge, newReleaseLockEdge, newEdgeUpdate)
-                //                process(lockEdge, newReleaseLockEdge, 
newEdgeUpdate, statusCode)
-              } else {
-                throw new PartialFailureException(edge, statusCode, 
s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]")
-              }
-            }
-        }
-    }
-  }
-
-  /** end of methods for consistency */
-
 
   //  def futureCache[T] = Cache[Long, (Long, T)]
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cf06febe/s2rest_play/conf/test.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/test.conf b/s2rest_play/conf/test.conf
index 2325dd9..a16b926 100644
--- a/s2rest_play/conf/test.conf
+++ b/s2rest_play/conf/test.conf
@@ -18,4 +18,4 @@
 #
 
 max.retry.number=10000
-hbase.fail.prob=0.1
+hbase.fail.prob=0.05


Reply via email to