Repository: incubator-s2graph
Updated Branches:
refs/heads/master a1d91ee02 -> cf06febe5
[S2GRAPH-68]: Refactor write-write conflict resolving logic.
more comments and simplified code.
remove excessive re-fetch on every retry from partial failure.
remove Thread.sleep between retry from partial failure.
JIRA:
[S2GRAPH-68] https://issues.apache.org/jira/browse/S2GRAPH-68
Pull Request:
Closes #53
Authors:
DOYUNG YOON: [email protected]
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cf06febe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cf06febe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cf06febe
Branch: refs/heads/master
Commit: cf06febe595bac3ff71fdb93289b3e545520a26e
Parents: a1d91ee
Author: DO YUNG YOON <[email protected]>
Authored: Fri Jun 17 10:08:17 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Jun 17 10:08:17 2016 +0900
----------------------------------------------------------------------
.../scala/org/apache/s2graph/core/Graph.scala | 4 +-
.../apache/s2graph/core/storage/Storage.scala | 712 +++++++++++--------
s2rest_play/conf/test.conf | 2 +-
3 files changed, 405 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cf06febe/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index ac1d4c1..f4ce7b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -55,12 +55,14 @@ object Graph {
"max.retry.number" -> java.lang.Integer.valueOf(100),
"lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
"max.back.off" -> java.lang.Integer.valueOf(100),
+ "back.off.timeout" -> java.lang.Integer.valueOf(1000),
"hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
"delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
- "s2graph.storage.backend" -> "hbase"
+ "s2graph.storage.backend" -> "hbase",
+ "query.hardlimit" -> java.lang.Integer.valueOf(100000)
)
var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cf06febe/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 92a93bd..e52c579 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,6 +19,8 @@
package org.apache.s2graph.core.storage
+import java.util.concurrent.{TimeUnit, Executors}
+
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.producer.ProducerRecord
@@ -35,7 +37,7 @@ import org.apache.s2graph.core.utils.{Extensions, logger}
import scala.annotation.tailrec
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{Promise, ExecutionContext, Future}
import scala.util.{Random, Try}
abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
@@ -44,12 +46,18 @@ abstract class Storage[R](val config: Config)(implicit ec:
ExecutionContext) {
/** storage dependent configurations */
val MaxRetryNum = config.getInt("max.retry.number")
val MaxBackOff = config.getInt("max.back.off")
+ val BackoffTimeout = config.getInt("back.off.timeout")
val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
val FailProb = config.getDouble("hbase.fail.prob")
- val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000)
- val maxSize = config.getInt("future.cache.max.size")
- val expireAfterWrite = config.getInt("future.cache.expire.after.write")
- val expireAfterAccess = config.getInt("future.cache.expire.after.access")
+ val LockExpireDuration = config.getInt("lock.expire.time")
+ val MaxSize = config.getInt("future.cache.max.size")
+ val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
+ val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
+
+ /** retry scheduler */
+ val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
+
+ val failTopic = s"mutateFailed_${config.getString("phase")}"
/**
* Compatibility table
@@ -348,7 +356,7 @@ abstract class Storage[R](val config: Config)(implicit ec:
ExecutionContext) {
case head :: tail =>
// val strongConsistency =
edges.head.label.consistencyLevel == "strong"
// if (strongConsistency) {
- val edgeFuture = mutateEdgesInner(edges, checkConsistency = true ,
withWait)(Edge.buildOperation)
+ val edgeFuture = mutateEdgesInner(edges, checkConsistency = true ,
withWait)
//TODO: decide what we will do on failure on vertex put
val puts = buildVertexPutsAsync(head)
@@ -392,81 +400,416 @@ abstract class Storage[R](val config: Config)(implicit
ec: ExecutionContext) {
def mutateEdgesInner(edges: Seq[Edge],
checkConsistency: Boolean,
- withWait: Boolean)(f: (Option[Edge], Seq[Edge]) =>
(Edge, EdgeMutate)): Future[Boolean] = {
+ withWait: Boolean): Future[Boolean] = {
+ assert(edges.nonEmpty)
if (!checkConsistency) {
val zkQuorum = edges.head.label.hbaseZkAddr
val futures = edges.map { edge =>
- val (_, edgeUpdate) = f(None, Seq(edge))
- val mutations =
- indexedEdgeMutations(edgeUpdate) ++
- snapshotEdgeMutations(edgeUpdate) ++
- increments(edgeUpdate)
+ val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge))
+ val mutations = indexedEdgeMutations(edgeUpdate) ++
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
writeToStorage(zkQuorum, mutations, withWait)
}
Future.sequence(futures).map { rets => rets.forall(identity) }
} else {
- def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = {
+ fetchSnapshotEdge(edges.head).flatMap { case (queryParam,
snapshotEdgeOpt, kvOpt) =>
+ retry(1)(edges, 0, snapshotEdgeOpt)
+ }
+ }
+ }
- fetchSnapshotEdge(_edges.head) flatMap { case (queryParam,
snapshotEdgeOpt, kvOpt) =>
+ def exponentialBackOff(tryNum: Int) = {
+ // time slot is divided by 10 ms
+ val slot = 10
+ Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
+ }
- val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges)
- logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}")
- //shouldReplace false.
- if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) {
- logger.debug(s"${newEdge.toLogString} drop.")
- Future.successful(true)
- } else {
- commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt,
edgeUpdate).map { ret =>
- if (ret) {
- logger.info(s"[Success] commit:
\n${_edges.map(_.toLogString).mkString("\n")}")
+ def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte,
fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = {
+ if (tryNum >= MaxRetryNum) {
+ edges.foreach { edge =>
+ logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
+
+ val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element
= edge)
+ ExceptionHandler.enqueue(kafkaMessage)
+ }
+
+ Future.successful(false)
+ } else {
+ val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt)
+ future.onSuccess {
+ case success =>
+ logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
+ }
+ future recoverWith {
+ case FetchTimeoutException(retryEdge) =>
+ logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
+ /** fetch failed. re-fetch should be done */
+ fetchSnapshotEdge(edges.head).flatMap { case (queryParam,
snapshotEdgeOpt, kvOpt) =>
+ retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
+ }
+
+
+ case PartialFailureException(retryEdge, failedStatusCode, faileReason)
=>
+ val status = failedStatusCode match {
+ case 0 => "AcquireLock failed."
+ case 1 => "Mutation failed."
+ case 2 => "Increment failed."
+ case 3 => "ReleaseLock failed."
+ case 4 => "Unknown"
+ }
+ logger.info(s"[Try: $tryNum], [Status: $status] partial
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
+
+ /** retry logic */
+ val promise = Promise[Boolean]
+ val backOff = exponentialBackOff(tryNum)
+ scheduledThreadPool.schedule(new Runnable {
+ override def run(): Unit = {
+ val future = if (failedStatusCode == 0) {
+ // acquire Lock failed. other is mutating so this thead need
to re-fetch snapshotEdge.
+ /** fetch failed. re-fetch should be done */
+ fetchSnapshotEdge(edges.head).flatMap { case (queryParam,
snapshotEdgeOpt, kvOpt) =>
+ retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
+ }
} else {
- throw new PartialFailureException(newEdge, 3, "commit failed.")
+ // partial failure occur while self locked and mutating.
+ // assert(fetchedSnapshotEdgeOpt.nonEmpty)
+ retry(tryNum + 1)(edges, failedStatusCode,
fetchedSnapshotEdgeOpt)
}
- true
+ promise.completeWith(future)
}
+
+ }, backOff, TimeUnit.MILLISECONDS)
+ promise.future
+
+ case ex: Exception =>
+ logger.error("Unknown exception", ex)
+ Future.successful(false)
+ }
+ }
+ }
+
+ protected def commitUpdate(edges: Seq[Edge],
+ statusCode: Byte,
+ fetchedSnapshotEdgeOpt: Option[Edge]):
Future[Boolean] = {
+// Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
+ assert(edges.nonEmpty)
+// assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
+
+ statusCode match {
+ case 0 =>
+ fetchedSnapshotEdgeOpt match {
+ case None =>
+ /**
+ * no one has never mutated this SN.
+ * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
+ * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now,
version = squashedEdge.ts + 1)
+ * lock = (squashedEdge, pendingE)
+ * releaseLock = (edgeMutate.newSnapshotEdge, None)
+ */
+ val (squashedEdge, edgeMutate) =
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+
+ assert(edgeMutate.newSnapshotEdge.isDefined)
+
+ val lockTs = Option(System.currentTimeMillis())
+ val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs =
lockTs, version = squashedEdge.ts + 1)
+ val lockSnapshotEdge =
squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+ val releaseLockSnapshotEdge =
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+ pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
+
+ commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt,
lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+
+ case Some(snapshotEdge) =>
+ snapshotEdge.pendingEdgeOpt match {
+ case None =>
+ /**
+ * others finished commit on this SN. but there is no
contention.
+ * (squashedEdge, edgeMutate) =
Edge.buildOperation(snapshotEdgeOpt, edges)
+ * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now,
version = snapshotEdge.version + 1) ?
+ * lock = (snapshotEdge, pendingE)
+ * releaseLock = (edgeMutate.newSnapshotEdge, None)
+ */
+ val (squashedEdge, edgeMutate) =
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+ if (edgeMutate.newSnapshotEdge.isEmpty) {
+ logger.debug(s"drop this requests:
\n${edges.map(_.toLogString).mkString("\n")}")
+ Future.successful(true)
+ } else {
+ val lockTs = Option(System.currentTimeMillis())
+ val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs =
lockTs, version = snapshotEdge.version + 1)
+ val lockSnapshotEdge =
snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+ val releaseLockSnapshotEdge =
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+ pendingEdgeOpt = None, version = lockSnapshotEdge.version
+ 1)
+ commitProcess(statusCode, squashedEdge,
fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+ }
+ case Some(pendingEdge) =>
+ val isLockExpired = pendingEdge.lockTs.get +
LockExpireDuration < System.currentTimeMillis()
+ if (isLockExpired) {
+ /**
+ * if pendingEdge.ts == snapshotEdge.ts =>
+ * (squashedEdge, edgeMutate) = Edge.buildOperation(None,
Seq(pendingEdge))
+ * else =>
+ * (squashedEdge, edgeMutate) =
Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge))
+ * pendingE = squashedEdge.copy(statusCode = 1, lockTs =
now, version = snapshotEdge.version + 1)
+ * lock = (snapshotEdge, pendingE)
+ * releaseLock = (edgeMutate.newSnapshotEdge, None)
+ */
+ logger.debug(s"${pendingEdge.toLogString} has been expired.")
+ val (squashedEdge, edgeMutate) =
+ if (pendingEdge.ts == snapshotEdge.ts)
Edge.buildOperation(None, pendingEdge +: edges)
+ else Edge.buildOperation(fetchedSnapshotEdgeOpt,
pendingEdge +: edges)
+
+ val lockTs = Option(System.currentTimeMillis())
+ val newPendingEdge = squashedEdge.copy(statusCode = 1,
lockTs = lockTs, version = snapshotEdge.version + 1)
+ val lockSnapshotEdge =
snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
+ val releaseLockSnapshotEdge =
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+ pendingEdgeOpt = None, version = lockSnapshotEdge.version
+ 1)
+
+ commitProcess(statusCode, squashedEdge,
fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+ } else {
+ /**
+ * others finished commit on this SN and there is currently
contention.
+ * this can't be proceed so retry from re-fetch.
+ * throw EX
+ */
+ val (squashedEdge, _) =
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+ Future.failed(new PartialFailureException(squashedEdge, 0,
s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
+ }
+ }
+
+ }
+ case _ =>
+
+ /**
+ * statusCode > 0 which means self locked and there has been partial
failure either on mutate, increment, releaseLock
+ */
+
+ /**
+ * this succeed to lock this SN. keep doing on commit process.
+ * if SN.isEmpty =>
+ * no one never succed to commit on this SN.
+ * this is first mutation try on this SN.
+ * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
+ * else =>
+ * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when
self retrying.
+ * there has been success commit on this SN.
+ * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
+ * releaseLock = (edgeMutate.newSnapshotEdge, None)
+ */
+ val _edges =
+ if (fetchedSnapshotEdgeOpt.isDefined &&
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined)
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
+ else edges
+ val (squashedEdge, edgeMutate) =
Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
+ val newVersion =
fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
+ val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
+ case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0,
pendingEdgeOpt = None, version = newVersion)
+ case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0,
pendingEdgeOpt = None, version = newVersion)
+ }
+ // lockSnapshotEdge will be ignored.
+ commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt,
releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+ }
+ }
+ /**
+ * orchestrate commit process.
+ * we separate into 4 step to avoid duplicating each step over and over.
+ * @param statusCode: current statusCode of this thread to process edges.
+ * @param squashedEdge: squashed(in memory) final edge from input edges on
same snapshotEdge.
+ * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before
commit process begin.
+ * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this
snapshotEdge for this thread.
+ * @param releaseLockSnapshotEdge: releaseLockEdge that will remove lock by
storing new final merged states
+ * all from current request edges and fetched
snapshotEdge.
+ * @param edgeMutate: mutations for indexEdge and snapshotEdge.
+ * @return
+ */
+ protected def commitProcess(statusCode: Byte,
+ squashedEdge: Edge,
+ fetchedSnapshotEdgeOpt:Option[Edge],
+ lockSnapshotEdge: SnapshotEdge,
+ releaseLockSnapshotEdge: SnapshotEdge,
+ edgeMutate: EdgeMutate): Future[Boolean] = {
+ for {
+ locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt,
lockSnapshotEdge)
+ mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge,
edgeMutate)
+ incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode,
squashedEdge, edgeMutate)
+ lockReleased <- releaseLock(incremented, statusCode, squashedEdge,
releaseLockSnapshotEdge)
+ } yield lockReleased
+ }
+
+ case class PartialFailureException(edge: Edge, statusCode: Byte, failReason:
String) extends Exception
+
+ protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge)
= {
+ val msg = Seq(s"[$ret] [$phase]",
s"${snapshotEdge.toLogString()}").mkString("\n")
+ logger.debug(msg)
+ }
+
+ protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge,
edgeMutate: EdgeMutate) = {
+ val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
+ s"${edgeMutate.toLogString}").mkString("\n")
+ logger.debug(msg)
+ }
+
+ /**
+ * try to acquire lock on storage for this given snapshotEdge(lockEdge).
+ * @param statusCode: current statusCode of this thread to process edges.
+ * @param squashedEdge: squashed(in memory) final edge from input edges on
same snapshotEdge. only for debug
+ * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage.
+ * @param lockEdge: lockEdge to build RPC request(compareAndSet) into
Storage.
+ * @return
+ */
+ protected def acquireLock(statusCode: Byte,
+ squashedEdge: Edge,
+ fetchedSnapshotEdgeOpt: Option[Edge],
+ lockEdge: SnapshotEdge): Future[Boolean] = {
+ if (statusCode >= 1) {
+ logger.debug(s"skip acquireLock:
[$statusCode]\n${squashedEdge.toLogString}")
+ Future.successful(true)
+ } else {
+ val p = Random.nextDouble()
+ if (p < FailProb) {
+ Future.failed(new PartialFailureException(squashedEdge, 0, s"$p"))
+ } else {
+ val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
+ val oldPut = fetchedSnapshotEdgeOpt.map(e =>
snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
+ writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
+ logger.error(s"AcquireLock RPC Failed.")
+ throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC
Failed")
+ }.map { ret =>
+ if (ret) {
+ val log = Seq(
+ "\n",
+ "=" * 50,
+ s"[Success]: acquireLock",
+ s"[RequestEdge]: ${squashedEdge.toLogString}",
+ s"[LockEdge]: ${lockEdge.toLogString()}",
+ s"[PendingEdge]:
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
+ "=" * 50, "\n").mkString("\n")
+
+ logger.debug(log)
+ // debug(ret, "acquireLock", edge.toSnapshotEdge)
+ } else {
+ throw new PartialFailureException(squashedEdge, 0, "hbase fail.")
}
+ true
}
}
- def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn:
(Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = {
- if (tryNum >= MaxRetryNum) {
- edges.foreach { edge =>
- logger.error(s"commit failed after
$MaxRetryNum\n${edge.toLogString}")
- ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element =
edge))
+ }
+ }
+
+
+ /**
+ * change this snapshot's state on storage from locked into committed by
+ * storing new merged states on storage. merge state come from
releaseLockEdge.
+ * note that releaseLock return Future.failed on predicate failure.
+ * @param predicate: indicate if this releaseLock phase should be proceed or
not.
+ * @param statusCode: releaseLock do not use statusCode, only for debug.
+ * @param squashedEdge: squashed(in memory) final edge from input edges on
same snapshotEdge. only for debug
+ * @param releaseLockEdge: final merged states if all process goes well.
+ * @return
+ */
+ protected def releaseLock(predicate: Boolean,
+ statusCode: Byte,
+ squashedEdge: Edge,
+ releaseLockEdge: SnapshotEdge): Future[Boolean] = {
+ if (!predicate) {
+ Future.failed(new PartialFailureException(squashedEdge, 3, "predicate
failed."))
+ } else {
+ val p = Random.nextDouble()
+ if (p < FailProb) Future.failed(new
PartialFailureException(squashedEdge, 3, s"$p"))
+ else {
+ val releaseLockEdgePuts =
snapshotEdgeSerializer(releaseLockEdge).toKeyValues
+ writeToStorage(squashedEdge.label.hbaseZkAddr, releaseLockEdgePuts,
withWait = true).recoverWith {
+ case ex: Exception =>
+ logger.error(s"ReleaseLock RPC Failed.")
+ throw new PartialFailureException(squashedEdge, 3, "ReleaseLock
RPC Failed")
+ }.map { ret =>
+ if (ret) {
+ debug(ret, "releaseLock", squashedEdge.toSnapshotEdge)
+ } else {
+ val msg = Seq("\nFATAL ERROR\n",
+ "=" * 50,
+ squashedEdge.toLogString,
+ releaseLockEdgePuts,
+ "=" * 50,
+ "\n"
+ )
+ logger.error(msg.mkString("\n"))
+ // error(ret, "releaseLock", edge.toSnapshotEdge)
+ throw new PartialFailureException(squashedEdge, 3, "hbase fail.")
}
- Future.successful(false)
- } else {
- val future = fn(edges, statusCode)
- future.onSuccess {
- case success =>
- logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
+ true
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param predicate: indicate if this commitIndexEdgeMutations phase should
be proceed or not.
+ * @param statusCode: current statusCode of this thread to process edges.
+ * @param squashedEdge: squashed(in memory) final edge from input edges on
same snapshotEdge. only for debug
+ * @param edgeMutate: actual collection of mutations. note that edgeMutate
contains snapshotEdge mutations,
+ * but in here, we only use indexEdge's mutations.
+ * @return
+ */
+ protected def commitIndexEdgeMutations(predicate: Boolean,
+ statusCode: Byte,
+ squashedEdge: Edge,
+ edgeMutate: EdgeMutate): Future[Boolean] = {
+ if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1,
"predicate failed."))
+ else {
+ if (statusCode >= 2) {
+ logger.debug(s"skip mutate:
[$statusCode]\n${squashedEdge.toLogString}")
+ Future.successful(true)
+ } else {
+ val p = Random.nextDouble()
+ if (p < FailProb) Future.failed(new
PartialFailureException(squashedEdge, 1, s"$p"))
+ else
+ writeToStorage(squashedEdge.label.hbaseZkAddr,
indexedEdgeMutations(edgeMutate), withWait = true).map { ret =>
+ if (ret) {
+ debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate)
+ } else {
+ throw new PartialFailureException(squashedEdge, 1, "hbase fail.")
+ }
+ true
}
- future recoverWith {
- case FetchTimeoutException(retryEdge) =>
- logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
- retry(tryNum + 1)(edges, statusCode)(fn)
-
- case PartialFailureException(retryEdge, failedStatusCode,
faileReason) =>
- val status = failedStatusCode match {
- case 0 => "AcquireLock failed."
- case 1 => "Mutation failed."
- case 2 => "Increment failed."
- case 3 => "ReleaseLock failed."
- case 4 => "Unknown"
- }
+ }
+ }
+ }
- Thread.sleep(Random.nextInt(MaxBackOff))
- logger.info(s"[Try: $tryNum], [Status: $status] partial
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
- retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn)
- case ex: Exception =>
- logger.error("Unknown exception", ex)
- Future.successful(false)
+ /**
+ *
+ * @param predicate: indicate if this commitIndexEdgeMutations phase should
be proceed or not.
+ * @param statusCode: current statusCode of this thread to process edges.
+ * @param squashedEdge: squashed(in memory) final edge from input edges on
same snapshotEdge. only for debug
+ * @param edgeMutate: actual collection of mutations. note that edgeMutate
contains snapshotEdge mutations,
+ * but in here, we only use indexEdge's degree mutations.
+ * @return
+ */
+ protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
+ statusCode: Byte,
+ squashedEdge: Edge,
+ edgeMutate: EdgeMutate): Future[Boolean] = {
+ if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2,
"predicate failed."))
+ if (statusCode >= 3) {
+ logger.debug(s"skip increment:
[$statusCode]\n${squashedEdge.toLogString}")
+ Future.successful(true)
+ } else {
+ val p = Random.nextDouble()
+ if (p < FailProb) Future.failed(new
PartialFailureException(squashedEdge, 2, s"$p"))
+ else
+ writeToStorage(squashedEdge.label.hbaseZkAddr, increments(edgeMutate),
withWait = true).map { ret =>
+ if (ret) {
+ debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate)
+ } else {
+ throw new PartialFailureException(squashedEdge, 2, "hbase fail.")
}
+ true
}
- }
- retry(1)(edges, 0)(commit)
}
}
+
+
+
+ /** end of methods for consistency */
+
def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
newEdge: Edge, edgeMutate: EdgeMutate) =
Seq("----------------------------------------------",
@@ -733,259 +1076,6 @@ abstract class Storage[R](val config: Config)(implicit
ec: ExecutionContext) {
// }
// }
- case class PartialFailureException(edge: Edge, statusCode: Byte, failReason:
String) extends Exception
-
- protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge)
= {
- val msg = Seq(s"[$ret] [$phase]",
s"${snapshotEdge.toLogString()}").mkString("\n")
- logger.debug(msg)
- }
-
- protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge,
edgeMutate: EdgeMutate) = {
- val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
- s"${edgeMutate.toLogString}").mkString("\n")
- logger.debug(msg)
- }
-
- protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge,
kvOpt: Option[SKeyValue]) = {
- val currentTs = System.currentTimeMillis()
- val lockTs = snapshotEdgeOpt match {
- case None => Option(currentTs)
- case Some(snapshotEdge) =>
- snapshotEdge.pendingEdgeOpt match {
- case None => Option(currentTs)
- case Some(pendingEdge) => pendingEdge.lockTs
- }
- }
- val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1
- // snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
- val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs =
lockTs)
- val base = snapshotEdgeOpt match {
- case None =>
- // no one ever mutated on this snapshotEdge.
- edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
- case Some(snapshotEdge) =>
- // there is at least one mutation have been succeed.
- snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt =
Option(pendingEdge))
- }
- base.copy(version = newVersion, statusCode = 1, lockTs = None)
- }
-
- protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge:
SnapshotEdge,
- edgeMutate: EdgeMutate) = {
- val newVersion = lockEdge.version + 1
- val base = edgeMutate.newSnapshotEdge match {
- case None =>
- // shouldReplace false
- assert(snapshotEdgeOpt.isDefined)
- snapshotEdgeOpt.get.toSnapshotEdge
- case Some(newSnapshotEdge) => newSnapshotEdge
- }
- base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None)
- }
-
- protected def acquireLock(statusCode: Byte,
- edge: Edge,
- oldSnapshotEdgeOpt: Option[Edge],
- lockEdge: SnapshotEdge,
- oldBytes: Array[Byte]): Future[Boolean] = {
- if (statusCode >= 1) {
- logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}")
- Future.successful(true)
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p")
- else {
- val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
- val oldPut = oldSnapshotEdgeOpt.map(e =>
snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
-// val lockEdgePut = buildPutAsync(lockEdge).head
-// val oldPut = oldSnapshotEdgeOpt.map(e =>
buildPutAsync(e.toSnapshotEdge).head)
- writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
- logger.error(s"AcquireLock RPC Failed.")
- throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed")
- }.map { ret =>
- if (ret) {
- val log = Seq(
- "\n",
- "=" * 50,
- s"[Success]: acquireLock",
- s"[RequestEdge]: ${edge.toLogString}",
- s"[LockEdge]: ${lockEdge.toLogString()}",
- s"[PendingEdge]:
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
- "=" * 50, "\n").mkString("\n")
-
- logger.debug(log)
- // debug(ret, "acquireLock", edge.toSnapshotEdge)
- } else {
- throw new PartialFailureException(edge, 0, "hbase fail.")
- }
- true
- }
- }
- }
- }
-
-
-
- protected def releaseLock(predicate: Boolean,
- edge: Edge,
- lockEdge: SnapshotEdge,
- releaseLockEdge: SnapshotEdge,
- _edgeMutate: EdgeMutate,
- oldBytes: Array[Byte]): Future[Boolean] = {
- if (!predicate) {
- throw new PartialFailureException(edge, 3, "predicate failed.")
- }
- val p = Random.nextDouble()
- if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p")
- else {
- val releaseLockEdgePut =
snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head
- val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
- writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith {
- case ex: Exception =>
- logger.error(s"ReleaseLock RPC Failed.")
- throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed")
- }.map { ret =>
- if (ret) {
- debug(ret, "releaseLock", edge.toSnapshotEdge)
- } else {
- val msg = Seq("\nFATAL ERROR\n",
- "=" * 50,
- oldBytes.toList,
- lockEdgePut,
- releaseLockEdgePut,
- // lockEdgePut.value.toList,
- // releaseLockEdgePut.value().toList,
- "=" * 50,
- "\n"
- )
- logger.error(msg.mkString("\n"))
- // error(ret, "releaseLock", edge.toSnapshotEdge)
- throw new PartialFailureException(edge, 3, "hbase fail.")
- }
- true
- }
- }
- Future.successful(true)
- }
-
-
- protected def mutate(predicate: Boolean,
- edge: Edge,
- statusCode: Byte,
- _edgeMutate: EdgeMutate): Future[Boolean] = {
- if (!predicate) throw new PartialFailureException(edge, 1, "predicate
failed.")
-
- if (statusCode >= 2) {
- logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}")
- Future.successful(true)
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p")
- else
- writeToStorage(edge.label.hbaseZkAddr,
indexedEdgeMutations(_edgeMutate), withWait = true).map { ret =>
- if (ret) {
- debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate)
- } else {
- throw new PartialFailureException(edge, 1, "hbase fail.")
- }
- true
- }
- }
- }
-
- protected def increment(predicate: Boolean,
- edge: Edge,
- statusCode: Byte, _edgeMutate: EdgeMutate):
Future[Boolean] = {
- if (!predicate) throw new PartialFailureException(edge, 2, "predicate
failed.")
- if (statusCode >= 3) {
- logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}")
- Future.successful(true)
- } else {
- val p = Random.nextDouble()
- if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p")
- else
- writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate),
withWait = true).map { ret =>
- if (ret) {
- debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate)
- } else {
- throw new PartialFailureException(edge, 2, "hbase fail.")
- }
- true
- }
- }
- }
-
-
- /** this may be overrided by specific storage implementation */
- protected def commitProcess(edge: Edge, statusCode: Byte)
- (snapshotEdgeOpt: Option[Edge], kvOpt:
Option[SKeyValue])
- (lockEdge: SnapshotEdge, releaseLockEdge:
SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = {
- val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte])
- for {
- locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge,
oldBytes)
- mutated <- mutate(locked, edge, statusCode, _edgeMutate)
- incremented <- increment(mutated, edge, statusCode, _edgeMutate)
- released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge,
_edgeMutate, oldBytes)
- } yield {
- released
- }
- }
-
- protected def commitUpdate(edge: Edge,
- statusCode: Byte)(snapshotEdgeOpt: Option[Edge],
- kvOpt: Option[SKeyValue],
- edgeUpdate: EdgeMutate):
Future[Boolean] = {
- val label = edge.label
- def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty)
-
- val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt)
- val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge,
edgeUpdate)
- val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_
- snapshotEdgeOpt match {
- case None =>
- // no one ever did success on acquire lock.
- _process(lockEdge, releaseLockEdge, edgeUpdate)
- // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode)
- case Some(snapshotEdge) =>
- // someone did success on acquire lock at least one.
- snapshotEdge.pendingEdgeOpt match {
- case None =>
- // not locked
- _process(lockEdge, releaseLockEdge, edgeUpdate)
- // process(lockEdge, releaseLockEdge, edgeUpdate,
statusCode)
- case Some(pendingEdge) =>
- def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration <
System.currentTimeMillis()
- if (isLockExpired) {
- val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts)
None else Option(snapshotEdge)
- val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge,
Seq(pendingEdge))
- val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge,
kvOpt)
- val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt,
newLockEdge, newEdgeUpdate)
- commitProcess(edge, statusCode = 0)(snapshotEdgeOpt,
kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret =>
- // process(newLockEdge, newReleaseLockEdge,
newEdgeUpdate, statusCode = 0).flatMap { ret =>
- val log = s"[Success]: Resolving expired pending
edge.\n${pendingEdge.toLogString}"
- throw new PartialFailureException(edge, 0, log)
- }
- } else {
- // locked
- if (pendingEdge.ts == edge.ts && statusCode > 0) {
- // self locked
- val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts)
None else Option(snapshotEdge)
- val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge,
Seq(edge))
- val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt,
lockEdge, newEdgeUpdate)
-
- /** lockEdge will be ignored */
- _process(lockEdge, newReleaseLockEdge, newEdgeUpdate)
- // process(lockEdge, newReleaseLockEdge,
newEdgeUpdate, statusCode)
- } else {
- throw new PartialFailureException(edge, statusCode,
s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]")
- }
- }
- }
- }
- }
-
- /** end of methods for consistency */
-
// def futureCache[T] = Cache[Long, (Long, T)]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cf06febe/s2rest_play/conf/test.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/test.conf b/s2rest_play/conf/test.conf
index 2325dd9..a16b926 100644
--- a/s2rest_play/conf/test.conf
+++ b/s2rest_play/conf/test.conf
@@ -18,4 +18,4 @@
#
max.retry.number=10000
-hbase.fail.prob=0.1
+hbase.fail.prob=0.05