http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
new file mode 100644
index 0000000..eab5cab
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -0,0 +1,438 @@
+package org.apache.s2graph.core.storage
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, 
NoStackException}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Random
+
+class WriteWriteConflictResolver(graph: S2Graph,
+                                 serDe: StorageSerDe,
+                                 io: StorageIO,
+                                 mutator: StorageWritable,
+                                 fetcher: StorageReadable[_]) {
+
+  val BackoffTimeout = graph.BackoffTimeout
+  val MaxRetryNum = graph.MaxRetryNum
+  val MaxBackOff = graph.MaxBackOff
+  val FailProb = graph.FailProb
+  val LockExpireDuration = graph.LockExpireDuration
+  val MaxSize = graph.MaxSize
+  val ExpireAfterWrite = graph.ExpireAfterWrite
+  val ExpireAfterAccess = graph.ExpireAfterAccess
+
+  /** retry scheduler */
+  val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
+
+  protected def exponentialBackOff(tryNum: Int) = {
+    // time slot is divided by 10 ms
+    val slot = 10
+    Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
+  }
+
+  def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, 
fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): 
Future[Boolean] = {
+    if (tryNum >= MaxRetryNum) {
+      edges.foreach { edge =>
+        logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
+      }
+
+      Future.successful(false)
+    } else {
+      val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt)
+      future.onSuccess {
+        case success =>
+          logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
+      }
+      future recoverWith {
+        case FetchTimeoutException(retryEdge) =>
+          logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
+          /* fetch failed. re-fetch should be done */
+          fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case 
(queryParam, snapshotEdgeOpt, kvOpt) =>
+            retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
+          }
+
+        case PartialFailureException(retryEdge, failedStatusCode, faileReason) 
=>
+          val status = failedStatusCode match {
+            case 0 => "AcquireLock failed."
+            case 1 => "Mutation failed."
+            case 2 => "Increment failed."
+            case 3 => "ReleaseLock failed."
+            case 4 => "Unknown"
+          }
+          logger.info(s"[Try: $tryNum], [Status: $status] partial 
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
+
+          /* retry logic */
+          val promise = Promise[Boolean]
+          val backOff = exponentialBackOff(tryNum)
+          scheduledThreadPool.schedule(new Runnable {
+            override def run(): Unit = {
+              val future = if (failedStatusCode == 0) {
+                // acquire Lock failed. other is mutating so this thead need 
to re-fetch snapshotEdge.
+                /* fetch failed. re-fetch should be done */
+                fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case 
(queryParam, snapshotEdgeOpt, kvOpt) =>
+                  retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
+                }
+              } else {
+                // partial failure occur while self locked and mutating.
+                //            assert(fetchedSnapshotEdgeOpt.nonEmpty)
+                retry(tryNum + 1)(edges, failedStatusCode, 
fetchedSnapshotEdgeOpt)
+              }
+              promise.completeWith(future)
+            }
+
+          }, backOff, TimeUnit.MILLISECONDS)
+          promise.future
+
+        case ex: Exception =>
+          logger.error("Unknown exception", ex)
+          Future.successful(false)
+      }
+    }
+  }
+
+  protected def commitUpdate(edges: Seq[S2Edge],
+                             statusCode: Byte,
+                             fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit 
ec: ExecutionContext): Future[Boolean] = {
+    //    Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
+    assert(edges.nonEmpty)
+    //    assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
+
+    statusCode match {
+      case 0 =>
+        fetchedSnapshotEdgeOpt match {
+          case None =>
+            /*
+             * no one has never mutated this SN.
+             * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges)
+             * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, 
version = squashedEdge.ts + 1)
+             * lock = (squashedEdge, pendingE)
+             * releaseLock = (edgeMutate.newSnapshotEdge, None)
+             */
+            val (squashedEdge, edgeMutate) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+
+            assert(edgeMutate.newSnapshotEdge.isDefined)
+
+            val lockTs = Option(System.currentTimeMillis())
+            val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = 
lockTs, version = squashedEdge.ts + 1)
+            val lockSnapshotEdge = 
squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+            val releaseLockSnapshotEdge = 
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+              pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
+
+            commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, 
lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+
+          case Some(snapshotEdge) =>
+            snapshotEdge.pendingEdgeOpt match {
+              case None =>
+                /*
+                 * others finished commit on this SN. but there is no 
contention.
+                 * (squashedEdge, edgeMutate) = 
Edge.buildOperation(snapshotEdgeOpt, edges)
+                 * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, 
version = snapshotEdge.version + 1) ?
+                 * lock = (snapshotEdge, pendingE)
+                 * releaseLock = (edgeMutate.newSnapshotEdge, None)
+                 */
+                val (squashedEdge, edgeMutate) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+                if (edgeMutate.newSnapshotEdge.isEmpty) {
+                  logger.debug(s"drop this requests: 
\n${edges.map(_.toLogString).mkString("\n")}")
+                  Future.successful(true)
+                } else {
+                  val lockTs = Option(System.currentTimeMillis())
+                  val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = 
lockTs, version = snapshotEdge.version + 1)
+                  val lockSnapshotEdge = 
snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+                  val releaseLockSnapshotEdge = 
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+                    pendingEdgeOpt = None, version = lockSnapshotEdge.version 
+ 1)
+                  commitProcess(statusCode, squashedEdge, 
fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+                }
+              case Some(pendingEdge) =>
+                val isLockExpired = pendingEdge.lockTs.get + 
LockExpireDuration < System.currentTimeMillis()
+                if (isLockExpired) {
+                  /*
+                   * if pendingEdge.ts == snapshotEdge.ts =>
+                   *    (squashedEdge, edgeMutate) = Edge.buildOperation(None, 
Seq(pendingEdge))
+                   * else =>
+                   *    (squashedEdge, edgeMutate) = 
Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge))
+                   * pendingE = squashedEdge.copy(statusCode = 1, lockTs = 
now, version = snapshotEdge.version + 1)
+                   * lock = (snapshotEdge, pendingE)
+                   * releaseLock = (edgeMutate.newSnapshotEdge, None)
+                   */
+                  logger.debug(s"${pendingEdge.toLogString} has been expired.")
+                  val (squashedEdge, edgeMutate) =
+                    if (pendingEdge.ts == snapshotEdge.ts) 
S2Edge.buildOperation(None, pendingEdge +: edges)
+                    else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, 
pendingEdge +: edges)
+
+                  val lockTs = Option(System.currentTimeMillis())
+                  val newPendingEdge = squashedEdge.copy(statusCode = 1, 
lockTs = lockTs, version = snapshotEdge.version + 1)
+                  val lockSnapshotEdge = 
snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
+                  val releaseLockSnapshotEdge = 
edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
+                    pendingEdgeOpt = None, version = lockSnapshotEdge.version 
+ 1)
+
+                  commitProcess(statusCode, squashedEdge, 
fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+                } else {
+                  /*
+                   * others finished commit on this SN and there is currently 
contention.
+                   * this can't be proceed so retry from re-fetch.
+                   * throw EX
+                   */
+                  val (squashedEdge, _) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+                  Future.failed(new PartialFailureException(squashedEdge, 0, 
s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
+                }
+            }
+
+        }
+      case _ =>
+
+        /*
+         * statusCode > 0 which means self locked and there has been partial 
failure either on mutate, increment, releaseLock
+         */
+
+        /*
+         * this succeed to lock this SN. keep doing on commit process.
+         * if SN.isEmpty =>
+         * no one never succed to commit on this SN.
+         * this is first mutation try on this SN.
+         * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
+         * else =>
+         * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when 
self retrying.
+         * there has been success commit on this SN.
+         * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges)
+         * releaseLock = (edgeMutate.newSnapshotEdge, None)
+         */
+        val _edges =
+          if (fetchedSnapshotEdgeOpt.isDefined && 
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) 
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
+          else edges
+        val (squashedEdge, edgeMutate) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
+        val newVersion = 
fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
+        val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
+          case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, 
pendingEdgeOpt = None, version = newVersion)
+          case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, 
pendingEdgeOpt = None, version = newVersion)
+        }
+        // lockSnapshotEdge will be ignored.
+        commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, 
releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate)
+    }
+  }
+
+  /**
+    * orchestrate commit process.
+    * we separate into 4 step to avoid duplicating each step over and over.
+    *
+    * @param statusCode              : current statusCode of this thread to 
process edges.
+    * @param squashedEdge            : squashed(in memory) final edge from 
input edges on same snapshotEdge.
+    * @param fetchedSnapshotEdgeOpt  : fetched snapshotEdge from storage 
before commit process begin.
+    * @param lockSnapshotEdge        : lockEdge that hold necessary data to 
lock this snapshotEdge for this thread.
+    * @param releaseLockSnapshotEdge : releaseLockEdge that will remove lock 
by storing new final merged states
+    *                                all from current request edges and 
fetched snapshotEdge.
+    * @param edgeMutate              : mutations for indexEdge and 
snapshotEdge.
+    * @return
+    */
+  protected def commitProcess(statusCode: Byte,
+                              squashedEdge: S2Edge,
+                              fetchedSnapshotEdgeOpt: Option[S2Edge],
+                              lockSnapshotEdge: SnapshotEdge,
+                              releaseLockSnapshotEdge: SnapshotEdge,
+                              edgeMutate: EdgeMutate)(implicit ec: 
ExecutionContext): Future[Boolean] = {
+    for {
+      locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, 
lockSnapshotEdge)
+      mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge, 
edgeMutate)
+      incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode, 
squashedEdge, edgeMutate)
+      lockReleased <- releaseLock(incremented, statusCode, squashedEdge, 
releaseLockSnapshotEdge)
+    } yield lockReleased
+  }
+
+  case class PartialFailureException(edge: S2Edge, statusCode: Byte, 
failReason: String) extends NoStackException(failReason)
+
+  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
+    val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
+    logger.debug(msg)
+  }
+
+  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, 
edgeMutate: EdgeMutate) = {
+    val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
+      s"${edgeMutate.toLogString}").mkString("\n")
+    logger.debug(msg)
+  }
+
+  /**
+    * try to acquire lock on storage for this given snapshotEdge(lockEdge).
+    *
+    * @param statusCode             : current statusCode of this thread to 
process edges.
+    * @param squashedEdge           : squashed(in memory) final edge from 
input edges on same snapshotEdge. only for debug
+    * @param fetchedSnapshotEdgeOpt : fetched snapshot edge from storage.
+    * @param lockEdge               : lockEdge to build RPC 
request(compareAndSet) into Storage.
+    * @return
+    */
+  protected def acquireLock(statusCode: Byte,
+                            squashedEdge: S2Edge,
+                            fetchedSnapshotEdgeOpt: Option[S2Edge],
+                            lockEdge: SnapshotEdge)(implicit ec: 
ExecutionContext): Future[Boolean] = {
+    if (statusCode >= 1) {
+      logger.debug(s"skip acquireLock: 
[$statusCode]\n${squashedEdge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) {
+        Future.failed(new PartialFailureException(squashedEdge, 0, s"$p"))
+      } else {
+        val lockEdgePut = 
serDe.snapshotEdgeSerializer(lockEdge).toKeyValues.head
+        val oldPut = fetchedSnapshotEdgeOpt.map(e => 
serDe.snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
+        mutator.writeLock(lockEdgePut, oldPut).recoverWith { case ex: 
Exception =>
+          logger.error(s"AcquireLock RPC Failed.")
+          throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC 
Failed")
+        }.map { ret =>
+          if (ret.isSuccess) {
+            val log = Seq(
+              "\n",
+              "=" * 50,
+              s"[Success]: acquireLock",
+              s"[RequestEdge]: ${squashedEdge.toLogString}",
+              s"[LockEdge]: ${lockEdge.toLogString()}",
+              s"[PendingEdge]: 
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
+              "=" * 50, "\n").mkString("\n")
+
+            logger.debug(log)
+            //            debug(ret, "acquireLock", edge.toSnapshotEdge)
+          } else {
+            throw new PartialFailureException(squashedEdge, 0, "hbase fail.")
+          }
+          true
+        }
+      }
+    }
+  }
+
+
+  /**
+    * change this snapshot's state on storage from locked into committed by
+    * storing new merged states on storage. merge state come from 
releaseLockEdge.
+    * note that releaseLock return Future.failed on predicate failure.
+    *
+    * @param predicate       : indicate if this releaseLock phase should be 
proceed or not.
+    * @param statusCode      : releaseLock do not use statusCode, only for 
debug.
+    * @param squashedEdge    : squashed(in memory) final edge from input edges 
on same snapshotEdge. only for debug
+    * @param releaseLockEdge : final merged states if all process goes well.
+    * @return
+    */
+  protected def releaseLock(predicate: Boolean,
+                            statusCode: Byte,
+                            squashedEdge: S2Edge,
+                            releaseLockEdge: SnapshotEdge)(implicit ec: 
ExecutionContext): Future[Boolean] = {
+    if (!predicate) {
+      Future.failed(new PartialFailureException(squashedEdge, 3, "predicate 
failed."))
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 3, s"$p"))
+      else {
+        val releaseLockEdgePuts = 
serDe.snapshotEdgeSerializer(releaseLockEdge).toKeyValues
+        mutator.writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, 
releaseLockEdgePuts, withWait = true).recoverWith {
+          case ex: Exception =>
+            logger.error(s"ReleaseLock RPC Failed.")
+            throw new PartialFailureException(squashedEdge, 3, "ReleaseLock 
RPC Failed")
+        }.map { ret =>
+          if (ret.isSuccess) {
+            debug(ret.isSuccess, "releaseLock", squashedEdge.toSnapshotEdge)
+          } else {
+            val msg = Seq("\nFATAL ERROR\n",
+              "=" * 50,
+              squashedEdge.toLogString,
+              releaseLockEdgePuts,
+              "=" * 50,
+              "\n"
+            )
+            logger.error(msg.mkString("\n"))
+            //          error(ret, "releaseLock", edge.toSnapshotEdge)
+            throw new PartialFailureException(squashedEdge, 3, "hbase fail.")
+          }
+          true
+        }
+      }
+    }
+  }
+
+  /**
+    *
+    * @param predicate    : indicate if this commitIndexEdgeMutations phase 
should be proceed or not.
+    * @param statusCode   : current statusCode of this thread to process edges.
+    * @param squashedEdge : squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
+    * @param edgeMutate   : actual collection of mutations. note that 
edgeMutate contains snapshotEdge mutations,
+    *                     but in here, we only use indexEdge's mutations.
+    * @return
+    */
+  protected def commitIndexEdgeMutations(predicate: Boolean,
+                                         statusCode: Byte,
+                                         squashedEdge: S2Edge,
+                                         edgeMutate: EdgeMutate)(implicit ec: 
ExecutionContext): Future[Boolean] = {
+    if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, 
"predicate failed."))
+    else {
+      if (statusCode >= 2) {
+        logger.debug(s"skip mutate: 
[$statusCode]\n${squashedEdge.toLogString}")
+        Future.successful(true)
+      } else {
+        val p = Random.nextDouble()
+        if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 1, s"$p"))
+        else
+          mutator.writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, 
io.indexedEdgeMutations(edgeMutate), withWait = true).map { ret =>
+            if (ret.isSuccess) {
+              debug(ret.isSuccess, "mutate", squashedEdge.toSnapshotEdge, 
edgeMutate)
+            } else {
+              throw new PartialFailureException(squashedEdge, 1, "hbase fail.")
+            }
+            true
+          }
+      }
+    }
+  }
+
+  /**
+    *
+    * @param predicate    : indicate if this commitIndexEdgeMutations phase 
should be proceed or not.
+    * @param statusCode   : current statusCode of this thread to process edges.
+    * @param squashedEdge : squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
+    * @param edgeMutate   : actual collection of mutations. note that 
edgeMutate contains snapshotEdge mutations,
+    *                     but in here, we only use indexEdge's degree 
mutations.
+    * @return
+    */
+  protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
+                                               statusCode: Byte,
+                                               squashedEdge: S2Edge,
+                                               edgeMutate: 
EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
+
+    def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
+      mutator.writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, kvs, 
withWait = withWait).map { ret =>
+        if (ret.isSuccess) {
+          debug(ret.isSuccess, "increment", squashedEdge.toSnapshotEdge, 
edgeMutate)
+        } else {
+          throw new PartialFailureException(squashedEdge, 2, "hbase fail.")
+        }
+        true
+      }
+    }
+
+    if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, 
"predicate failed."))
+    if (statusCode >= 3) {
+      logger.debug(s"skip increment: 
[$statusCode]\n${squashedEdge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 2, s"$p"))
+      else {
+        val (bufferIncr, nonBufferIncr) = io.increments(edgeMutate.deepCopy)
+
+        if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false)
+        _write(nonBufferIncr, withWait = true)
+      }
+    }
+  }
+
+  /** end of methods for consistency */
+
+  def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge],
+                newEdge: S2Edge, edgeMutate: EdgeMutate) =
+    Seq("----------------------------------------------",
+      s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
+      s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
+      s"newEdge: ${newEdge.toLogString}",
+      s"mutation: \n${edgeMutate.toLogString}",
+      "----------------------------------------------").mkString("\n")
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 4fb2240..54007d5 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -20,38 +20,18 @@
 package org.apache.s2graph.core.storage.hbase
 
 
-
 import java.util
-import java.util.Base64
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors}
 
-import com.stumbleupon.async.{Callback, Deferred}
 import com.typesafe.config.Config
 import org.apache.commons.io.FileUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability}
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, 
ScanWithRange}
-import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.AsyncRPC
 import org.apache.s2graph.core.utils._
-import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
 import org.hbase.async._
-
+import org.apache.s2graph.core.storage.serde._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.Try
-import scala.util.control.NonFatal
-import scala.util.hashing.MurmurHash3
 
 
 object AsynchbaseStorage {
@@ -92,6 +72,7 @@ object AsynchbaseStorage {
   }
 
   case class ScanWithRange(scan: Scanner, offset: Int, limit: Int)
+
   type AsyncRPC = Either[GetRequest, ScanWithRange]
 
   def initLocalHBase(config: Config,
@@ -159,734 +140,29 @@ object AsynchbaseStorage {
 
 
 class AsynchbaseStorage(override val graph: S2Graph,
-                        override val config: Config)(implicit ec: 
ExecutionContext)
-  extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) {
-
-  import Extensions.DeferOps
-
-  val hbaseExecutor: ExecutorService  =
-    if (config.getString("hbase.zookeeper.quorum") == "localhost")
-      AsynchbaseStorage.initLocalHBase(config)
-    else
-      null
-
-  /**
-   * Asynchbase client setup.
-   * note that we need two client, one for bulk(withWait=false) and another 
for withWait=true
-   */
-  private val clientFlushInterval = 
config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
-
-  /**
-   * since some runtime environment such as spark cluster has issue with guava 
version, that is used in Asynchbase.
-   * to fix version conflict, make this as lazy val for clients that don't 
require hbase client.
-   */
-  lazy val client = AsynchbaseStorage.makeClient(config)
-  lazy val clientWithFlush = AsynchbaseStorage.makeClient(config, 
"hbase.rpcs.buffered_flush_interval" -> "0")
-  lazy val clients = Seq(client, clientWithFlush)
-
-  private val emptyKeyValues = new util.ArrayList[KeyValue]()
-  private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
-  private val emptyStepResult = new util.ArrayList[StepResult]()
-
-  private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client
-
-  import CanDefer._
-
-  /** Future Cache to squash request */
-  lazy private val futureCache = new DeferCache[StepResult, Deferred, 
Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
-
-  /** Simple Vertex Cache */
-  lazy private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, 
Future](config, Seq.empty[SKeyValue])
-
-  private val zkQuorum = config.getString("hbase.zookeeper.quorum")
-  private val zkQuorumSlave =
-    if (config.hasPath("hbase.slave.zookeeper.quorum")) 
Option(config.getString("hbase.slave.zookeeper.quorum"))
-    else None
-
-  /** v4 max next row size */
-  private val v4_max_num_rows = 10000
-  private def getV4MaxNumRows(limit : Int): Int = {
-    if (limit < v4_max_num_rows) limit
-    else v4_max_num_rows
-  }
-
-  /**
-   * fire rpcs into proper hbase cluster using client and
-   * return true on all mutation success. otherwise return false.
-   */
-  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean): Future[Boolean] = {
-    if (kvs.isEmpty) Future.successful(true)
-    else {
-      val _client = client(withWait)
-      val (increments, putAndDeletes) = kvs.partition(_.operation == 
SKeyValue.Increment)
-
-      /* Asynchbase IncrementRequest does not implement HasQualifiers */
-      val incrementsFutures = increments.map { kv =>
-        val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, 
kv.qualifier, Bytes.toLong(kv.value))
-        val defer = _client.atomicIncrement(inc)
-        val future = defer.toFuture(Long.box(0)).map(_ => true).recover { case 
ex: Exception =>
-          logger.error(s"mutation failed. $kv", ex)
-          false
-        }
-        if (withWait) future else Future.successful(true)
-      }
-
-      /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
-      val othersFutures = putAndDeletes.groupBy { kv =>
-        (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
-      }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) 
=>
-
-        val durability = groupedKeyValues.head.durability
-        val qualifiers = new ArrayBuffer[Array[Byte]]()
-        val values = new ArrayBuffer[Array[Byte]]()
-
-        groupedKeyValues.foreach { kv =>
-          if (kv.qualifier != null) qualifiers += kv.qualifier
-          if (kv.value != null) values += kv.value
-        }
-        val defer = operation match {
-          case SKeyValue.Put =>
-            val put = new PutRequest(table.toArray, row.toArray, cf.toArray, 
qualifiers.toArray, values.toArray, timestamp)
-            put.setDurable(durability)
-            _client.put(put)
-          case SKeyValue.Delete =>
-            val delete =
-              if (qualifiers.isEmpty)
-                new DeleteRequest(table.toArray, row.toArray, cf.toArray, 
timestamp)
-              else
-                new DeleteRequest(table.toArray, row.toArray, cf.toArray, 
qualifiers.toArray, timestamp)
-            delete.setDurable(durability)
-            _client.delete(delete)
-        }
-        if (withWait) {
-          defer.toFuture(new AnyRef()).map(_ => true).recover { case ex: 
Exception =>
-            groupedKeyValues.foreach { kv => logger.error(s"mutation failed. 
$kv", ex) }
-            false
-          }
-        } else Future.successful(true)
-      }
-      for {
-        incrementRets <- Future.sequence(incrementsFutures)
-        otherRets <- Future.sequence(othersFutures)
-      } yield (incrementRets ++ otherRets).forall(identity)
-    }
-  }
-
-  private def fetchKeyValues(rpc: AsyncRPC): Future[Seq[SKeyValue]] = {
-    val defer = fetchKeyValuesInner(rpc)
-    defer.toFuture(emptyKeyValues).map { kvsArr =>
-      kvsArr.map { kv =>
-        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-      }
-    }
-  }
-
-  override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): 
Future[Seq[SKeyValue]] = {
-    val edge = toRequestEdge(queryRequest, Nil)
-    val rpc = buildRequest(queryRequest, edge)
-
-    fetchKeyValues(rpc)
-  }
-
-  /**
-   * since HBase natively provide CheckAndSet on storage level, implementation 
becomes simple.
-   * @param rpc: key value that is need to be stored on storage.
-   * @param expectedOpt: last valid value for rpc's KeyValue.value from 
fetching.
-   * @return return true if expected value matches and our rpc is successfully 
applied, otherwise false.
-   *         note that when some other thread modified same cell and have 
different value on this KeyValue,
-   *         then HBase atomically return false.
-   */
-  override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): 
Future[Boolean] = {
-    val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, 
rpc.value, rpc.timestamp)
-    val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
-    client(withWait = true).compareAndSet(put, 
expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
-  }
-
-
-  /**
-   * given queryRequest, build storage specific RPC Request.
-   * In HBase case, we either build Scanner or GetRequest.
-   *
-   * IndexEdge layer:
-   *    Tall schema(v4): use scanner.
-   *    Wide schema(label's schema version in v1, v2, v3): use GetRequest with 
columnRangeFilter
-   *                                                       when query is given 
with itnerval option.
-   * SnapshotEdge layer:
-   *    Tall schema(v3, v4): use GetRequest without column filter.
-   *    Wide schema(label's schema version in v1, v2): use GetRequest with 
columnRangeFilter.
-   * Vertex layer:
-   *    all version: use GetRequest without column filter.
-   * @param queryRequest
-   * @return Scanner or GetRequest with proper setup with StartKey, EndKey, 
RangeFilter.
-   */
-  override def buildRequest(queryRequest: QueryRequest, edge: S2Edge): 
AsyncRPC = {
-    import Serializable._
-    val queryParam = queryRequest.queryParam
-    val label = queryParam.label
-
-    val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
-      val snapshotEdge = edge.toSnapshotEdge
-      snapshotEdgeSerializer(snapshotEdge)
-    } else {
-      val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
-      indexEdgeSerializer(indexEdge)
-    }
-
-    val rowKey = serializer.toRowKey
-    val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
-
-    val (intervalMaxBytes, intervalMinBytes) = 
queryParam.buildInterval(Option(edge))
-
-    label.schemaVersion match {
-      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
-        val scanner = AsynchbasePatcher.newScanner(client, 
label.hbaseTableName)
-        scanner.setFamily(edgeCf)
-
-        /*
-         * TODO: remove this part.
-         */
-        val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => 
edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
-        val indexEdge = indexEdgeOpt.getOrElse(throw new 
RuntimeException(s"Can`t find index for query $queryParam"))
-
-        val srcIdBytes = 
VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-        val labelWithDirBytes = indexEdge.labelWithDir.bytes
-        val labelIndexSeqWithIsInvertedBytes = 
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = false)
-        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-
-        val (startKey, stopKey) =
-          if (queryParam.intervalOpt.isDefined) {
-            // interval is set.
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Base64.getDecoder.decode(cursor)
-              case None => Bytes.add(baseKey, intervalMaxBytes)
-            }
-            (_startKey , Bytes.add(baseKey, intervalMinBytes))
-          } else {
-             /*
-              * note: since propsToBytes encode size of property map at first 
byte, we are sure about max value here
-              */
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Base64.getDecoder.decode(cursor)
-              case None => baseKey
-            }
-            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
-          }
-
-        scanner.setStartKey(startKey)
-        scanner.setStopKey(stopKey)
-
-        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: 
$queryParam")
-
-        scanner.setMaxVersions(1)
-        // TODO: exclusive condition innerOffset with cursorOpt
-        if (queryParam.cursorOpt.isDefined) {
-          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit))
-        } else {
-          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + 
queryParam.innerLimit))
-        }
-        scanner.setMaxTimestamp(maxTs)
-        scanner.setMinTimestamp(minTs)
-        scanner.setRpcTimeout(queryParam.rpcTimeout)
-
-        // SET option for this rpc properly.
-        if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, 
queryParam.limit))
-        else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + 
queryParam.innerLimit))
-
-      case _ =>
-        val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
-          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, 
serializer.toQualifier)
-        } else {
-          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
-        }
-
-        get.maxVersions(1)
-        get.setFailfast(true)
-        get.setMinTimestamp(minTs)
-        get.setMaxTimestamp(maxTs)
-        get.setTimeout(queryParam.rpcTimeout)
-
-        val pagination = new ColumnPaginationFilter(queryParam.limit, 
queryParam.offset)
-        val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
-          new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
-        }
-        get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, 
MUST_PASS_ALL))
-        Left(get)
-    }
-  }
+                        override val config: Config) extends 
Storage[AsyncRPC](graph, config) {
 
   /**
-   * we are using future cache to squash requests into same key on storage.
-   *
-   * @param queryRequest
-   * @param isInnerCall
-   * @param parentEdges
-   * @return we use Deferred here since it has much better performrance 
compared to scala.concurrent.Future.
-   *         seems like map, flatMap on scala.concurrent.Future is slower than 
Deferred's addCallback
-   */
-  override def fetch(queryRequest: QueryRequest,
-                     isInnerCall: Boolean,
-                     parentEdges: Seq[EdgeWithScore]): Deferred[StepResult] = {
-
-    def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
-      val prevStepScore = queryRequest.prevStepScore
-      val fallbackFn: (Exception => StepResult) = { ex =>
-        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        StepResult.Failure
-      }
-
-      val queryParam = queryRequest.queryParam
-      
fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs 
=>
-        val (startOffset, len) = queryParam.label.schemaVersion match {
-          case HBaseType.VERSION4 =>
-            val offset = if (queryParam.cursorOpt.isDefined) 0 else 
queryParam.offset
-            (offset, queryParam.limit)
-          case _ => (0, kvs.length)
-        }
-
-        toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, 
startOffset, len)
-      }
-    }
-
-    val queryParam = queryRequest.queryParam
-    val cacheTTL = queryParam.cacheTTLInMillis
-    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise 
GetRequest. */
+    * since some runtime environment such as spark cluster has issue with 
guava version, that is used in Asynchbase.
+    * to fix version conflict, make this as lazy val for clients that don't 
require hbase client.
+    */
+  val client = AsynchbaseStorage.makeClient(config)
+  val clientWithFlush = AsynchbaseStorage.makeClient(config, 
"hbase.rpcs.buffered_flush_interval" -> "0")
+  val clients = Seq(client, clientWithFlush)
 
-    val edge = toRequestEdge(queryRequest, parentEdges)
-    val request = buildRequest(queryRequest, edge)
+  override val management: StorageManagement = new 
AsynchbaseStorageManagement(config, clients)
 
-    val (intervalMaxBytes, intervalMinBytes) = 
queryParam.buildInterval(Option(edge))
-    val requestCacheKey = Bytes.add(toCacheKeyBytes(request), 
intervalMaxBytes, intervalMinBytes)
+  override val mutator: StorageWritable = new 
AsynchbaseStorageWritable(client, clientWithFlush)
 
-    if (cacheTTL <= 0) fetchInner(request)
-    else {
-      val cacheKeyBytes = 
Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
+  override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
-//      val cacheKeyBytes = toCacheKeyBytes(request)
-      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
-    }
-  }
-
-  override def fetches(queryRequests: Seq[QueryRequest],
-                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[StepResult]] = {
-    val defers: Seq[Deferred[StepResult]] = for {
-      queryRequest <- queryRequests
-    } yield {
-        val queryOption = queryRequest.query.queryOption
-        val queryParam = queryRequest.queryParam
-        val shouldBuildParents = queryOption.returnTree || 
queryParam.whereHasParent
-        val parentEdges = if (shouldBuildParents) 
prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
-        fetch(queryRequest, isInnerCall = false, parentEdges)
-      }
-
-    val grouped: Deferred[util.ArrayList[StepResult]] = 
Deferred.groupInOrder(defers)
-    grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
-      queryResults.toSeq
-    }.toFuture(emptyStepResult)
-  }
-
-
-  def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] = {
-    val edge = toRequestEdge(request, Nil)
-    fetchKeyValues(buildRequest(request, edge))
-  }
-
-
-  def fetchVertexKeyValues(request: AsyncRPC): Future[Seq[SKeyValue]] = 
fetchKeyValues(request)
+  override val fetcher: StorageReadable[AsyncRPC] = new 
AsynchbaseStorageReadable(graph, config, client, serDe, io)
 
-  /**
-   * when withWait is given, we use client with flushInterval set to 0.
-   * if we are not using this, then we are adding extra wait time as much as 
flushInterval in worst case.
-   *
-   * @param edges
-   * @param withWait
-   * @return
-   */
-  override def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long, Long)]] = {
-
-    val _client = client(withWait)
-    val defers: Seq[Deferred[(Boolean, Long, Long)]] = for {
-      edge <- edges
-    } yield {
-        val futures: List[Deferred[(Boolean, Long, Long)]] = for {
-          relEdge <- edge.relatedEdges
-          edgeWithIndex <- 
EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
-        } yield {
-          val countWithTs = edge.propertyValueInner(LabelMeta.count)
-          val countVal = countWithTs.innerVal.toString().toLong
-          val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head
-          val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, 
kv.qualifier, Bytes.toLong(kv.value))
-          val fallbackFn: (Exception => (Boolean, Long, Long)) = { ex =>
-            logger.error(s"mutation failed. $request", ex)
-            (false, -1L, -1L)
-          }
-          val defer = 
_client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { 
resultCount: java.lang.Long =>
-            (true, resultCount.longValue(), countVal)
-          }
-          if (withWait) defer
-          else Deferred.fromResult((true, -1L, -1L))
-        }
-
-        val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = 
Deferred.group(futures)
-        grouped.map(new util.ArrayList[(Boolean, Long, Long)]()) { resultLs => 
resultLs.head }
-      }
-
-    val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = 
Deferred.groupInOrder(defers)
-    grouped.toFuture(new util.ArrayList[(Boolean, Long, Long)]()).map(_.toSeq)
-  }
-
-
-  override def flush(): Unit = clients.foreach { client =>
-    super.flush()
-    val timeout = Duration((clientFlushInterval + 10) * 20, 
duration.MILLISECONDS)
-    Await.result(client.flush().toFuture(new AnyRef), timeout)
-  }
+  //  val hbaseExecutor: ExecutorService  =
+  //    if (config.getString("hbase.zookeeper.quorum") == "localhost")
+  //      AsynchbaseStorage.initLocalHBase(config)
+  //    else
+  //      null
 
 
-  override def shutdown(): Unit = {
-    flush()
-    clients.foreach { client =>
-      AsynchbaseStorage.shutdown(client)
-    }
-    if (hbaseExecutor != null) {
-      hbaseExecutor.shutdown()
-      hbaseExecutor.awaitTermination(1, TimeUnit.MINUTES)
-    }
-  }
-
-  override def createTable(_zkAddr: String,
-                           tableName: String,
-                           cfs: List[String],
-                           regionMultiplier: Int,
-                           ttl: Option[Int],
-                           compressionAlgorithm: String,
-                           replicationScopeOpt: Option[Int] = None,
-                           totalRegionCount: Option[Int] = None): Unit = {
-    /* TODO: Decide if we will allow each app server to connect to multiple 
hbase cluster */
-    for {
-      zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
-    } {
-      logger.info(s"create table: $tableName on $zkAddr, $cfs, 
$regionMultiplier, $compressionAlgorithm")
-      withAdmin(zkAddr) { admin =>
-        val regionCount = 
totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * 
regionMultiplier)
-        try {
-          if (!admin.tableExists(TableName.valueOf(tableName))) {
-            val desc = new HTableDescriptor(TableName.valueOf(tableName))
-            desc.setDurability(Durability.ASYNC_WAL)
-            for (cf <- cfs) {
-              val columnDesc = new HColumnDescriptor(cf)
-                
.setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-                .setBloomFilterType(BloomType.ROW)
-                .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-                .setMaxVersions(1)
-                .setTimeToLive(2147483647)
-                .setMinVersions(0)
-                .setBlocksize(32768)
-                .setBlockCacheEnabled(true)
-                // FIXME: For test!!
-                .setInMemory(true)
-              if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-              if (replicationScopeOpt.isDefined) 
columnDesc.setScope(replicationScopeOpt.get)
-              desc.addFamily(columnDesc)
-            }
-
-            if (regionCount <= 1) admin.createTable(desc)
-            else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
-          } else {
-            logger.info(s"$zkAddr, $tableName, $cfs already exist.")
-          }
-        } catch {
-          case e: Throwable =>
-            logger.error(s"$zkAddr, $tableName failed with $e", e)
-            throw e
-        }
-      }
-    }
-  }
-
-  override def truncateTable(zkAddr: String, tableNameStr: String): Unit = {
-    withAdmin(zkAddr) { admin =>
-      val tableName = TableName.valueOf(tableNameStr)
-      if (!Try(admin.tableExists(tableName)).getOrElse(false)) {
-        logger.info(s"No table to truncate ${tableNameStr}")
-        return
-      }
-
-      Try(admin.isTableDisabled(tableName)).map {
-        case true =>
-          logger.info(s"${tableNameStr} is already disabled.")
-
-        case false =>
-          logger.info(s"Before disabling to trucate ${tableNameStr}")
-          Try(admin.disableTable(tableName)).recover {
-            case NonFatal(e) =>
-              logger.info(s"Failed to disable ${tableNameStr}: ${e}")
-          }
-          logger.info(s"After disabling to trucate ${tableNameStr}")
-      }
-
-      logger.info(s"Before truncating ${tableNameStr}")
-      Try(admin.truncateTable(tableName, true)).recover {
-        case NonFatal(e) =>
-          logger.info(s"Failed to truncate ${tableNameStr}: ${e}")
-      }
-      logger.info(s"After truncating ${tableNameStr}")
-      Try(admin.close()).recover {
-        case NonFatal(e) =>
-          logger.info(s"Failed to close admin ${tableNameStr}: ${e}")
-      }
-      Try(admin.getConnection.close()).recover {
-        case NonFatal(e) =>
-          logger.info(s"Failed to close connection ${tableNameStr}: ${e}")
-      }
-    }
-  }
-
-  override def deleteTable(zkAddr: String, tableNameStr: String): Unit = {
-    withAdmin(zkAddr) { admin =>
-      val tableName = TableName.valueOf(tableNameStr)
-      if (!admin.tableExists(tableName)) {
-        return
-      }
-      if (admin.isTableEnabled(tableName)) {
-        admin.disableTable(tableName)
-      }
-      admin.deleteTable(tableName)
-    }
-  }
-
-  /** Asynchbase implementation override default getVertices to use future 
Cache */
-  override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
-    def fromResult(kvs: Seq[SKeyValue],
-                   version: String): Option[S2Vertex] = {
-      if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(kvs, None)
-    }
-
-    val futures = vertices.map { vertex =>
-      val kvs = vertexSerializer(vertex).toKeyValues
-      val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, 
Serializable.vertexCf)
-      //      get.setTimeout(this.singleGetTimeout.toShort)
-      get.setFailfast(true)
-      get.maxVersions(1)
-
-      fetchVertexKeyValues(Left(get)).map { kvs =>
-        fromResult(kvs, vertex.serviceColumn.schemaVersion)
-      }
-//      val cacheKey = MurmurHash3.stringHash(get.toString)
-//      vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 
-1)(fetchVertexKeyValues(Left(get))).map { kvs =>
-//        fromResult(kvs, vertex.serviceColumn.schemaVersion)
-//      }
-    }
-
-    Future.sequence(futures).map { result => result.toList.flatten }
-  }
-
-  //TODO: Limited to 100000 edges per hbase table. fix this later.
-  override def fetchEdgesAll(): Future[Seq[S2Edge]] = {
-    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case 
(hTableName, labels) =>
-      val distinctLabels = labels.toSet
-      val scan = AsynchbasePatcher.newScanner(client, hTableName)
-      scan.setFamily(Serializable.edgeCf)
-      scan.setMaxVersions(1)
-
-      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
-        case null => Seq.empty
-        case kvsLs =>
-          kvsLs.flatMap { kvs =>
-            kvs.flatMap { kv =>
-              val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-
-              indexEdgeDeserializer.fromKeyValues(Seq(kv), None)
-                .filter(e => distinctLabels(e.innerLabel) && e.direction == 
"out" && !e.isDegree)
-            }
-          }
-      }
-    }
-
-    Future.sequence(futures).map(_.flatten)
-  }
-
-  override def fetchVerticesAll(): Future[Seq[S2Vertex]] = {
-    val futures = 
ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case 
(hTableName, columns) =>
-      val distinctColumns = columns.toSet
-      val scan = AsynchbasePatcher.newScanner(client, hTableName)
-      scan.setFamily(Serializable.vertexCf)
-      scan.setMaxVersions(1)
-
-      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
-        case null => Seq.empty
-        case kvsLs =>
-          kvsLs.flatMap { kvs =>
-            vertexDeserializer.fromKeyValues(kvs, None)
-              .filter(v => distinctColumns(v.serviceColumn))
-          }
-      }
-    }
-    Future.sequence(futures).map(_.flatten)
-  }
-
-  class V4ResultHandler(scanner: Scanner, defer: 
Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends 
Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {
-    val results = new util.ArrayList[KeyValue]()
-    var offsetCount = 0
-
-    override def call(kvsLs: util.ArrayList[util.ArrayList[KeyValue]]): Object 
= {
-      try {
-        if (kvsLs == null) {
-          defer.callback(results)
-          Try(scanner.close())
-        } else {
-          val curRet = new util.ArrayList[KeyValue]()
-          kvsLs.foreach(curRet.addAll(_))
-          val prevOffset = offsetCount
-          offsetCount += curRet.size()
-
-          val nextRet = if(offsetCount > offset){
-            if(prevOffset < offset ) {
-              curRet.subList(offset - prevOffset, curRet.size())
-            } else{
-              curRet
-            }
-          } else{
-            emptyKeyValues
-          }
-
-          val needCount = limit - results.size()
-          if (needCount >= nextRet.size()) {
-            results.addAll(nextRet)
-          } else {
-            results.addAll(nextRet.subList(0, needCount))
-          }
-
-          if (results.size() < limit) {
-            scanner.nextRows().addCallback(this)
-          } else {
-            defer.callback(results)
-            Try(scanner.close())
-          }
-        }
-      } catch{
-        case ex: Exception =>
-          logger.error(s"fetchKeyValuesInner failed.", ex)
-          defer.callback(ex)
-          Try(scanner.close())
-      }
-    }
-  }
-
-  /**
-   * Private Methods which is specific to Asynchbase implementation.
-   */
-  private def fetchKeyValuesInner(rpc: AsyncRPC): 
Deferred[util.ArrayList[KeyValue]] = {
-    rpc match {
-      case Left(get) => client.get(get)
-      case Right(ScanWithRange(scanner, offset, limit)) =>
-        val deferred = new Deferred[util.ArrayList[KeyValue]]()
-        scanner.nextRows().addCallback(new V4ResultHandler(scanner, deferred, 
offset, limit))
-        deferred
-      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues 
failed. $rpc"))
-    }
-  }
-
-  private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
-    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise 
GetRequest. */
-    hbaseRpc match {
-      case Left(getRequest) => getRequest.key
-      case Right(ScanWithRange(scanner, offset, limit)) =>
-        Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), 
Bytes.toBytes(limit)))
-      case _ =>
-        logger.error(s"toCacheKeyBytes failed. not supported class type. 
$hbaseRpc")
-        throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
-    }
-  }
-
-  private def getSecureClusterAdmin(zkAddr: String) = {
-    val jaas = config.getString("java.security.auth.login.config")
-    val krb5Conf = config.getString("java.security.krb5.conf")
-    val realm = config.getString("realm")
-    val principal = config.getString("principal")
-    val keytab = config.getString("keytab")
-
-    System.setProperty("java.security.auth.login.config", jaas)
-    System.setProperty("java.security.krb5.conf", krb5Conf)
-    // System.setProperty("sun.security.krb5.debug", "true")
-    // System.setProperty("sun.security.spnego.debug", "true")
-    val conf = new Configuration(true)
-    val hConf = HBaseConfiguration.create(conf)
-
-    hConf.set("hbase.zookeeper.quorum", zkAddr)
-
-    hConf.set("hadoop.security.authentication", "Kerberos")
-    hConf.set("hbase.security.authentication", "Kerberos")
-    hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm)
-    hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm)
-
-    System.out.println("Connecting secure cluster, using keytab\n")
-    UserGroupInformation.setConfiguration(hConf)
-    UserGroupInformation.loginUserFromKeytab(principal, keytab)
-    val currentUser = UserGroupInformation.getCurrentUser()
-    System.out.println("current user : " + currentUser + "\n")
-
-    // get table list
-    val conn = ConnectionFactory.createConnection(hConf)
-    conn.getAdmin
-  }
-
-  private def withAdmin(zkAddr: String)(op: Admin => Unit): Unit = {
-    val admin = getAdmin(zkAddr)
-    try {
-      op(admin)
-    } finally {
-      admin.close()
-      admin.getConnection.close()
-    }
-  }
-  /**
-   * following configuration need to come together to use secured hbase 
cluster.
-   * 1. set hbase.security.auth.enable = true
-   * 2. set file path to jaas file java.security.auth.login.config
-   * 3. set file path to kerberos file java.security.krb5.conf
-   * 4. set realm
-   * 5. set principal
-   * 6. set file path to keytab
-   * @param zkAddr
-   * @return
-   */
-  private def getAdmin(zkAddr: String) = {
-    if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
-      getSecureClusterAdmin(zkAddr)
-    } else {
-      val conf = HBaseConfiguration.create()
-      conf.set("hbase.zookeeper.quorum", zkAddr)
-      val conn = ConnectionFactory.createConnection(conf)
-      conn.getAdmin
-    }
-  }
-
-  private def enableTable(zkAddr: String, tableName: String) = {
-    withAdmin(zkAddr) { admin =>
-      admin.enableTable(TableName.valueOf(tableName))
-    }
-  }
-
-  private def disableTable(zkAddr: String, tableName: String) = {
-    withAdmin(zkAddr) { admin =>
-      admin.disableTable(TableName.valueOf(tableName))
-    }
-  }
-
-  private def dropTable(zkAddr: String, tableName: String) = {
-    withAdmin(zkAddr) { admin =>
-      admin.disableTable(TableName.valueOf(tableName))
-      admin.deleteTable(TableName.valueOf(tableName))
-    }
-  }
-
-  private def getStartKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount))
-  }
-
-  private def getEndKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
new file mode 100644
index 0000000..c55c6c7
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala
@@ -0,0 +1,263 @@
+package org.apache.s2graph.core.storage.hbase
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
+import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability}
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.s2graph.core.storage.StorageManagement
+import org.apache.s2graph.core.utils.{Extensions, logger}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, duration}
+import scala.util.Try
+import scala.util.control.NonFatal
+
+object AsynchbaseStorageManagement {
+  /* Secure cluster */
+  val SecurityAuthEnabled = "hbase.security.auth.enable"
+  val Jaas = "java.security.auth.login.config"
+  val Krb5Conf = "java.security.krb5.conf"
+  val Realm = "realm"
+  val Principal = "principal"
+  val Keytab = "keytab"
+  val HadoopAuthentication = "hadoop.security.authentication"
+  val HBaseAuthentication = "hbase.security.authentication"
+  val MasterKerberosPrincipal = "hbase.master.kerberos.principal"
+  val RegionServerKerberosPrincipal = "hbase.regionserver.kerberos.principal"
+
+
+  val DefaultCreateTableOptions = Map(
+    "hbase.zookeeper.quorum" -> "localhost"
+  )
+}
+
+class AsynchbaseStorageManagement(val config: Config, val clients: 
Seq[HBaseClient]) extends StorageManagement {
+  import org.apache.s2graph.core.Management._
+  import AsynchbaseStorageManagement._
+  import Extensions.DeferOps
+
+  /**
+    * Asynchbase client setup.
+    * note that we need two client, one for bulk(withWait=false) and another 
for withWait=true
+    */
+  private val clientFlushInterval = 
config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
+
+  /**
+    * this method need to be called when client shutdown. this is responsible 
to cleanUp the resources
+    * such as client into storage.
+    */
+  override def flush(): Unit = clients.foreach { client =>
+    implicit val ec = 
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
+
+    val timeout = Duration((clientFlushInterval + 10) * 20, 
duration.MILLISECONDS)
+    Await.result(client.flush().toFuture(new AnyRef), timeout)
+  }
+
+  def getOption[T](config: Config, key: String): Option[T] = {
+    import scala.util._
+    Try { config.getAnyRef(key).asInstanceOf[T] }.toOption
+  }
+  /**
+    * create table on storage.
+    * if storage implementation does not support namespace or table, then 
there is nothing to be done
+    *
+    * @param config
+    */
+  override def createTable(config: Config, tableNameStr: String): Unit = {
+    val zkAddr = config.getString(ZookeeperQuorum)
+
+    withAdmin(config) { admin =>
+      val regionMultiplier = getOption[Int](config, 
RegionMultiplier).getOrElse(0)
+      val regionCount = getOption[Int](config, 
TotalRegionCount).getOrElse(admin.getClusterStatus.getServersSize * 
regionMultiplier)
+      val cfs = getOption[Seq[String]](config, 
ColumnFamilies).getOrElse(DefaultColumnFamilies)
+      val compressionAlgorithm = getOption[String](config, 
CompressionAlgorithm).getOrElse(DefaultCompressionAlgorithm)
+      val ttl = getOption[Int](config, Ttl)
+      val replicationScoreOpt = getOption[Int](config, ReplicationScope)
+
+      val tableName = TableName.valueOf(tableNameStr)
+      try {
+        if (!admin.tableExists(tableName)) {
+          val desc = new HTableDescriptor(tableName)
+          desc.setDurability(Durability.ASYNC_WAL)
+          for (cf <- cfs) {
+            val columnDesc = new HColumnDescriptor(cf)
+              
.setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+              .setBloomFilterType(BloomType.ROW)
+              .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+              .setMaxVersions(1)
+              .setTimeToLive(2147483647)
+              .setMinVersions(0)
+              .setBlocksize(32768)
+              .setBlockCacheEnabled(true)
+              // FIXME: For test!!
+              .setInMemory(true)
+
+            ttl.foreach(columnDesc.setTimeToLive(_))
+            replicationScoreOpt.foreach(columnDesc.setScope(_))
+
+            desc.addFamily(columnDesc)
+          }
+
+          if (regionCount <= 1) admin.createTable(desc)
+          else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
+        } else {
+          logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+        }
+      } catch {
+        case e: Throwable =>
+          logger.error(s"$zkAddr, $tableName failed with $e", e)
+          throw e
+      }
+    }
+  }
+
+  /**
+    *
+    * @param config
+    * @param tableNameStr
+    */
+  override def truncateTable(config: Config, tableNameStr: String): Unit = {
+    withAdmin(config) { admin =>
+      val tableName = TableName.valueOf(tableNameStr)
+      if (!Try(admin.tableExists(tableName)).getOrElse(false)) {
+        logger.info(s"No table to truncate ${tableNameStr}")
+        return
+      }
+
+      Try(admin.isTableDisabled(tableName)).map {
+        case true =>
+          logger.info(s"${tableNameStr} is already disabled.")
+
+        case false =>
+          logger.info(s"Before disabling to trucate ${tableNameStr}")
+          Try(admin.disableTable(tableName)).recover {
+            case NonFatal(e) =>
+              logger.info(s"Failed to disable ${tableNameStr}: ${e}")
+          }
+          logger.info(s"After disabling to trucate ${tableNameStr}")
+      }
+
+      logger.info(s"Before truncating ${tableNameStr}")
+      Try(admin.truncateTable(tableName, true)).recover {
+        case NonFatal(e) =>
+          logger.info(s"Failed to truncate ${tableNameStr}: ${e}")
+      }
+      logger.info(s"After truncating ${tableNameStr}")
+      Try(admin.close()).recover {
+        case NonFatal(e) =>
+          logger.info(s"Failed to close admin ${tableNameStr}: ${e}")
+      }
+      Try(admin.getConnection.close()).recover {
+        case NonFatal(e) =>
+          logger.info(s"Failed to close connection ${tableNameStr}: ${e}")
+      }
+    }
+  }
+
+  /**
+    *
+    * @param config
+    * @param tableNameStr
+    */
+  override def deleteTable(config: Config, tableNameStr: String): Unit = {
+    withAdmin(config) { admin =>
+      val tableName = TableName.valueOf(tableNameStr)
+      if (!admin.tableExists(tableName)) {
+        return
+      }
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName)
+      }
+      admin.deleteTable(tableName)
+    }
+  }
+
+  /**
+    *
+    */
+  override def shutdown(): Unit = {
+    flush()
+    clients.foreach { client =>
+      AsynchbaseStorage.shutdown(client)
+    }
+  }
+
+
+
+  private def getSecureClusterAdmin(config: Config) = {
+    val zkAddr = config.getString(ZookeeperQuorum)
+    val realm = config.getString(Realm)
+    val principal = config.getString(Principal)
+    val keytab = config.getString(Keytab)
+
+    System.setProperty(Jaas, config.getString(Jaas))
+    System.setProperty(Krb5Conf, config.getString(Krb5Conf))
+
+
+    val conf = new Configuration(true)
+    val hConf = HBaseConfiguration.create(conf)
+
+    hConf.set(ZookeeperQuorum, zkAddr)
+
+    hConf.set(HadoopAuthentication, "Kerberos")
+    hConf.set(HBaseAuthentication, "Kerberos")
+    hConf.set(MasterKerberosPrincipal, "hbase/_HOST@" + realm)
+    hConf.set(RegionServerKerberosPrincipal, "hbase/_HOST@" + realm)
+
+    System.out.println("Connecting secure cluster, using keytab\n")
+    UserGroupInformation.setConfiguration(hConf)
+    UserGroupInformation.loginUserFromKeytab(principal, keytab)
+    val currentUser = UserGroupInformation.getCurrentUser()
+    System.out.println("current user : " + currentUser + "\n")
+
+    // get table list
+    val conn = ConnectionFactory.createConnection(hConf)
+    conn.getAdmin
+  }
+
+  private def withAdmin(config: Config)(op: Admin => Unit): Unit = {
+    val admin = getAdmin(config)
+    try {
+      op(admin)
+    } finally {
+      admin.close()
+      admin.getConnection.close()
+    }
+  }
+  /**
+    * following configuration need to come together to use secured hbase 
cluster.
+    * 1. set hbase.security.auth.enable = true
+    * 2. set file path to jaas file java.security.auth.login.config
+    * 3. set file path to kerberos file java.security.krb5.conf
+    * 4. set realm
+    * 5. set principal
+    * 6. set file path to keytab
+    */
+  private def getAdmin(config: Config) = {
+    if (config.hasPath(SecurityAuthEnabled) && 
config.getBoolean(SecurityAuthEnabled)) {
+      getSecureClusterAdmin(config)
+    } else {
+      val zkAddr = config.getString(ZookeeperQuorum)
+      val conf = HBaseConfiguration.create()
+      conf.set(ZookeeperQuorum, zkAddr)
+      val conn = ConnectionFactory.createConnection(conf)
+      conn.getAdmin
+    }
+  }
+
+  private def getStartKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount))
+  }
+
+  private def getEndKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
new file mode 100644
index 0000000..4ef95b8
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -0,0 +1,335 @@
+package org.apache.s2graph.core.storage.hbase
+
+import java.util
+import java.util.Base64
+
+import com.stumbleupon.async.Deferred
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.storage.serde._
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, 
ScanWithRange}
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
+import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
+import org.hbase.async._
+
+import scala.collection.JavaConversions._
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseStorageReadable(val graph: S2Graph,
+                                val config: Config,
+                                val client: HBaseClient,
+                                val serDe: StorageSerDe,
+                                override val io: StorageIO) extends 
StorageReadable[AsyncRPC] {
+  import Extensions.DeferOps
+  import CanDefer._
+
+  private val emptyKeyValues = new util.ArrayList[KeyValue]()
+  private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
+  private val emptyStepResult = new util.ArrayList[StepResult]()
+
+  /** Future Cache to squash request */
+  lazy private val futureCache = new DeferCache[StepResult, Deferred, 
Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
+  /** v4 max next row size */
+  private val v4_max_num_rows = 10000
+  private def getV4MaxNumRows(limit : Int): Int = {
+    if (limit < v4_max_num_rows) limit
+    else v4_max_num_rows
+  }
+
+  /**
+    * build proper request which is specific into storage to call 
fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
+    * for example, Asynchbase use GetRequest, Scanner so this method is 
responsible to build
+    * client request(GetRequest, Scanner) based on user provided query.
+    *
+    * @param queryRequest
+    * @return
+    */
+  override def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
+    import Serializable._
+    val queryParam = queryRequest.queryParam
+    val label = queryParam.label
+
+    val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+      val snapshotEdge = edge.toSnapshotEdge
+      serDe.snapshotEdgeSerializer(snapshotEdge)
+    } else {
+      val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
+      serDe.indexEdgeSerializer(indexEdge)
+    }
+
+    val rowKey = serializer.toRowKey
+    val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
+
+    val (intervalMaxBytes, intervalMinBytes) = 
queryParam.buildInterval(Option(edge))
+
+    label.schemaVersion match {
+      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
+        val scanner = AsynchbasePatcher.newScanner(client, 
label.hbaseTableName)
+        scanner.setFamily(edgeCf)
+
+        /*
+         * TODO: remove this part.
+         */
+        val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => 
edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
+        val indexEdge = indexEdgeOpt.getOrElse(throw new 
RuntimeException(s"Can`t find index for query $queryParam"))
+
+        val srcIdBytes = 
VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+        val labelWithDirBytes = indexEdge.labelWithDir.bytes
+        val labelIndexSeqWithIsInvertedBytes = 
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = false)
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+
+        val (startKey, stopKey) =
+          if (queryParam.intervalOpt.isDefined) {
+            // interval is set.
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => Bytes.add(baseKey, intervalMaxBytes)
+            }
+            (_startKey , Bytes.add(baseKey, intervalMinBytes))
+          } else {
+            /*
+             * note: since propsToBytes encode size of property map at first 
byte, we are sure about max value here
+             */
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => baseKey
+            }
+            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+          }
+
+        scanner.setStartKey(startKey)
+        scanner.setStopKey(stopKey)
+
+        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: 
$queryParam")
+
+        scanner.setMaxVersions(1)
+        // TODO: exclusive condition innerOffset with cursorOpt
+        if (queryParam.cursorOpt.isDefined) {
+          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit))
+        } else {
+          scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + 
queryParam.innerLimit))
+        }
+        scanner.setMaxTimestamp(maxTs)
+        scanner.setMinTimestamp(minTs)
+        scanner.setRpcTimeout(queryParam.rpcTimeout)
+
+        // SET option for this rpc properly.
+        if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, 
queryParam.limit))
+        else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + 
queryParam.innerLimit))
+
+      case _ =>
+        val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, 
serializer.toQualifier)
+        } else {
+          new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
+        }
+
+        get.maxVersions(1)
+        get.setFailfast(true)
+        get.setMinTimestamp(minTs)
+        get.setMaxTimestamp(maxTs)
+        get.setTimeout(queryParam.rpcTimeout)
+
+        val pagination = new ColumnPaginationFilter(queryParam.limit, 
queryParam.offset)
+        val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
+          new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
+        }
+        get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, 
MUST_PASS_ALL))
+        Left(get)
+    }
+  }
+
+  /**
+    *
+    * @param queryRequest
+    * @param vertex
+    * @return
+    */
+  override def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = {
+    val kvs = serDe.vertexSerializer(vertex).toKeyValues
+    val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, 
Serializable.vertexCf)
+    //      get.setTimeout(this.singleGetTimeout.toShort)
+    get.setFailfast(true)
+    get.maxVersions(1)
+    Left(get)
+  }
+
+  /**
+    * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
+    *
+    * @param queryRequests
+    * @param prevStepEdges
+    * @return
+    */
+  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: 
Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = {
+    val defers: Seq[Deferred[StepResult]] = for {
+      queryRequest <- queryRequests
+    } yield {
+      val queryOption = queryRequest.query.queryOption
+      val queryParam = queryRequest.queryParam
+      val shouldBuildParents = queryOption.returnTree || 
queryParam.whereHasParent
+      val parentEdges = if (shouldBuildParents) 
prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+      fetch(queryRequest, isInnerCall = false, parentEdges)
+    }
+
+    val grouped: Deferred[util.ArrayList[StepResult]] = 
Deferred.groupInOrder(defers)
+    grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
+      queryResults.toSeq
+    }.toFuture(emptyStepResult)
+  }
+
+  override def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
+    val defer = fetchKeyValuesInner(rpc)
+    defer.toFuture(emptyKeyValues).map { kvsArr =>
+      kvsArr.map { kv =>
+        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+      }
+    }
+  }
+
+  override def fetchEdgesAll()(implicit ec: ExecutionContext): 
Future[Seq[S2Edge]] = {
+    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case 
(hTableName, labels) =>
+      val distinctLabels = labels.toSet
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.edgeCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.flatMap { kvs =>
+            kvs.flatMap { kv =>
+              val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+
+              serDe.indexEdgeDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+                .filter(e => distinctLabels(e.innerLabel) && e.direction == 
"out" && !e.isDegree)
+            }
+          }
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+
+  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+    val futures = 
ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case 
(hTableName, columns) =>
+      val distinctColumns = columns.toSet
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.vertexCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.flatMap { kvs =>
+            serDe.vertexDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(kvs, None)
+              .filter(v => distinctColumns(v.serviceColumn))
+          }
+      }
+    }
+    Future.sequence(futures).map(_.flatten)
+  }
+
+
+  /**
+    * we are using future cache to squash requests into same key on storage.
+    *
+    * @param queryRequest
+    * @param isInnerCall
+    * @param parentEdges
+    * @return we use Deferred here since it has much better performrance 
compared to scala.concurrent.Future.
+    *         seems like map, flatMap on scala.concurrent.Future is slower 
than Deferred's addCallback
+    */
+  private def fetch(queryRequest: QueryRequest,
+                    isInnerCall: Boolean,
+                    parentEdges: Seq[EdgeWithScore])(implicit ec: 
ExecutionContext): Deferred[StepResult] = {
+
+    def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
+      val prevStepScore = queryRequest.prevStepScore
+      val fallbackFn: (Exception => StepResult) = { ex =>
+        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+        StepResult.Failure
+      }
+
+      val queryParam = queryRequest.queryParam
+      
fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs 
=>
+        val (startOffset, len) = queryParam.label.schemaVersion match {
+          case HBaseType.VERSION4 =>
+            val offset = if (queryParam.cursorOpt.isDefined) 0 else 
queryParam.offset
+            (offset, queryParam.limit)
+          case _ => (0, kvs.length)
+        }
+
+        io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, 
startOffset, len)
+      }
+    }
+
+    val queryParam = queryRequest.queryParam
+    val cacheTTL = queryParam.cacheTTLInMillis
+    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise 
GetRequest. */
+
+    val edge = Storage.toRequestEdge(graph)(queryRequest, parentEdges)
+    val request = buildRequest(queryRequest, edge)
+
+    val (intervalMaxBytes, intervalMinBytes) = 
queryParam.buildInterval(Option(edge))
+    val requestCacheKey = Bytes.add(toCacheKeyBytes(request), 
intervalMaxBytes, intervalMinBytes)
+
+    if (cacheTTL <= 0) fetchInner(request)
+    else {
+      val cacheKeyBytes = 
Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
+
+      //      val cacheKeyBytes = toCacheKeyBytes(request)
+      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+      futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+    }
+  }
+
+  /**
+    * Private Methods which is specific to Asynchbase implementation.
+    */
+  private def fetchKeyValuesInner(rpc: AsyncRPC)(implicit ec: 
ExecutionContext): Deferred[util.ArrayList[KeyValue]] = {
+    rpc match {
+      case Left(get) => client.get(get)
+      case Right(ScanWithRange(scanner, offset, limit)) =>
+        val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex =>
+          logger.error(s"fetchKeyValuesInner failed.", ex)
+          scanner.close()
+          emptyKeyValues
+        }
+
+        scanner.nextRows().mapWithFallback(new 
util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs =>
+          val ls = new util.ArrayList[KeyValue]
+          if (kvsLs == null) {
+          } else {
+            kvsLs.foreach { kvs =>
+              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
+              else {
+
+              }
+            }
+          }
+
+          scanner.close()
+          val toIndex = Math.min(ls.size(), offset + limit)
+          new util.ArrayList[KeyValue](ls.subList(offset, toIndex))
+        }
+      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues 
failed. $rpc"))
+    }
+  }
+
+  private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
+    /* with version 4, request's type is (Scanner, (Int, Int)). otherwise 
GetRequest. */
+    hbaseRpc match {
+      case Left(getRequest) => getRequest.key
+      case Right(ScanWithRange(scanner, offset, limit)) =>
+        Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), 
Bytes.toBytes(limit)))
+      case _ =>
+        logger.error(s"toCacheKeyBytes failed. not supported class type. 
$hbaseRpc")
+        throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala
new file mode 100644
index 0000000..ab1ff19
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala
@@ -0,0 +1,68 @@
+package org.apache.s2graph.core.storage.hbase
+
+import org.apache.s2graph.core.storage.serde.Deserializable
+import org.apache.s2graph.core.{IndexEdge, S2Graph, S2Vertex, SnapshotEdge}
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe, serde}
+
+class AsynchbaseStorageSerDe(val graph: S2Graph) extends StorageSerDe {
+  import org.apache.s2graph.core.types.HBaseType._
+
+  /**
+    * create serializer that knows how to convert given snapshotEdge into kvs: 
Seq[SKeyValue]
+    * so we can store this kvs.
+    *
+    * @param snapshotEdge : snapshotEdge to serialize
+    * @return serializer implementation for StorageSerializable which has 
toKeyValues return Seq[SKeyValue]
+    */
+  override def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = {
+    snapshotEdge.schemaVer match {
+      //      case VERSION1 |
+      case VERSION2 => new 
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
+      case VERSION3 | VERSION4 => new 
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
+      case _ => throw new RuntimeException(s"not supported version: 
${snapshotEdge.schemaVer}")
+    }
+  }
+
+  /**
+    * create serializer that knows how to convert given indexEdge into kvs: 
Seq[SKeyValue]
+    *
+    * @param indexEdge : indexEdge to serialize
+    * @return serializer implementation
+    */
+  override def indexEdgeSerializer(indexEdge: IndexEdge) = {
+    indexEdge.schemaVer match {
+      //      case VERSION1
+      case VERSION2 | VERSION3 => new 
serde.indexedge.wide.IndexEdgeSerializable(indexEdge)
+      case VERSION4 => new 
serde.indexedge.tall.IndexEdgeSerializable(indexEdge)
+      case _ => throw new RuntimeException(s"not supported version: 
${indexEdge.schemaVer}")
+    }
+  }
+
+  /**
+    * create serializer that knows how to convert given vertex into kvs: 
Seq[SKeyValue]
+    *
+    * @param vertex : vertex to serialize
+    * @return serializer implementation
+    */
+  override def vertexSerializer(vertex: S2Vertex) = new 
serde.vertex.wide.VertexSerializable(vertex)
+
+  /**
+    * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
+    * note that each storage implementation should implement implicit type 
class
+    * to convert storage dependent dataType into common SKeyValue type by 
implementing CanSKeyValue
+    *
+    * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has 
implicit type conversion method.
+    * if any storaage use different class to represent stored byte array,
+    * then that storage implementation is responsible to provide implicit type 
conversion method on CanSKeyValue.
+    **/
+  private val snapshotEdgeDeserializable = new 
serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
+  override def snapshotEdgeDeserializer(schemaVer: String) = 
snapshotEdgeDeserializable
+
+  /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
+  private val indexEdgeDeserializer = new 
serde.indexedge.tall.IndexEdgeDeserializable(graph)
+  override def indexEdgeDeserializer(schemaVer: String) = indexEdgeDeserializer
+
+  /** create deserializer that can parser stored CanSKeyValue into vertex. */
+  private val vertexDeserializer = new 
serde.vertex.wide.VertexDeserializable(graph)
+  override def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] 
= vertexDeserializer
+}

Reply via email to