http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
deleted file mode 100644
index 337ed3f..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import java.util
-import java.util.concurrent.TimeUnit
-import com.google.common.cache.CacheBuilder
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.QueryBuilder
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.{Extensions, logger}
-import com.stumbleupon.async.Deferred
-import org.apache.hadoop.hbase.util.Bytes
-import org.hbase.async.GetRequest
-import scala.annotation.tailrec
-import scala.collection.JavaConversions._
-import scala.collection.{Map, Seq}
-import scala.util.Random
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: 
ExecutionContext)
-  extends QueryBuilder[GetRequest, Deferred[QueryRequestWithResult]](storage) {
-
-  import Extensions.DeferOps
-
-  val maxSize = storage.config.getInt("future.cache.max.size")
-  val expreAfterWrite = 
storage.config.getInt("future.cache.expire.after.write")
-  val expreAfterAccess = 
storage.config.getInt("future.cache.expire.after.access")
-
-  val futureCache = CacheBuilder.newBuilder()
-//  .recordStats()
-  .initialCapacity(maxSize)
-  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
-  .expireAfterWrite(expreAfterWrite, TimeUnit.MILLISECONDS)
-  .expireAfterAccess(expreAfterAccess, TimeUnit.MILLISECONDS)
-//  .weakKeys()
-  .maximumSize(maxSize).build[java.lang.Long, (Long, 
Deferred[QueryRequestWithResult])]()
-
-  //  val scheduleTime = 60L * 60
-//  val scheduleTime = 60
-//  val scheduler = Executors.newScheduledThreadPool(1)
-//
-//  scheduler.scheduleAtFixedRate(new Runnable(){
-//    override def run() = {
-//      logger.info(s"[FutureCache]: ${futureCache.stats()}")
-//    }
-//  }, scheduleTime, scheduleTime, TimeUnit.SECONDS)
-
-  override def buildRequest(queryRequest: QueryRequest): GetRequest = {
-    val srcVertex = queryRequest.vertex
-    //    val tgtVertexOpt = queryRequest.tgtVertexOpt
-    val edgeCf = HSerializable.edgeCf
-
-    val queryParam = queryRequest.queryParam
-    val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
-    val label = queryParam.label
-    val labelWithDir = queryParam.labelWithDir
-    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
-    val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
-      case Some(tgtVertexId) => // _to is given.
-        /** we use toSnapshotEdge so dont need to swap src, tgt */
-        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-        val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, 
label.schemaVersion)
-        (src, tgt)
-      case None =>
-        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-        (src, src)
-    }
-
-    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
-    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
-    val currentTs = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs)).toMap
-    val edge = Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs)
-
-    val get = if (tgtVertexIdOpt.isDefined) {
-      val snapshotEdge = edge.toSnapshotEdge
-      val kv = storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.head
-      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
-    } else {
-      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == 
queryParam.labelOrderSeq)
-      assert(indexedEdgeOpt.isDefined)
-
-      val indexedEdge = indexedEdgeOpt.get
-      val kv = storage.indexEdgeSerializer(indexedEdge).toKeyValues.head
-      val table = label.hbaseTableName.getBytes
-      val rowKey = kv.row
-      val cf = edgeCf
-      new GetRequest(table, rowKey, cf)
-    }
-
-    val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
-
-    get.maxVersions(1)
-    get.setFailfast(true)
-    get.setMaxResultsPerColumnFamily(queryParam.limit)
-    get.setRowOffsetPerColumnFamily(queryParam.offset)
-    get.setMinTimestamp(minTs)
-    get.setMaxTimestamp(maxTs)
-    get.setTimeout(queryParam.rpcTimeoutInMillis)
-
-    if (queryParam.columnRangeFilter != null) 
get.setFilter(queryParam.columnRangeFilter)
-
-    get
-  }
-
-  override def getEdge(srcVertex: Vertex, tgtVertex: Vertex, queryParam: 
QueryParam, isInnerCall: Boolean): Deferred[QueryRequestWithResult] = {
-    //TODO:
-    val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(tgtVertex.innerId))
-    val q = Query.toQuery(Seq(srcVertex), _queryParam)
-    val queryRequest = QueryRequest(q, 0, srcVertex, _queryParam)
-    fetch(queryRequest, 1.0, isInnerCall = true, parentEdges = Nil)
-  }
-
-  override def fetch(queryRequest: QueryRequest,
-                     prevStepScore: Double,
-                     isInnerCall: Boolean,
-                     parentEdges: Seq[EdgeWithScore]): 
Deferred[QueryRequestWithResult] = {
-    @tailrec
-    def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = 
Set.empty[Int]): Set[Int] = {
-      if (range < sampleNumber || set.size == sampleNumber) set
-      else randomInt(sampleNumber, range, set + Random.nextInt(range))
-    }
-
-    def sample(edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
-      if (edges.size <= n){
-        edges
-      }else{
-        val plainEdges = if (queryRequest.queryParam.offset == 0) {
-          edges.tail
-        } else edges
-
-        val randoms = randomInt(n, plainEdges.size)
-        var samples = List.empty[EdgeWithScore]
-        var idx = 0
-        plainEdges.foreach { e =>
-          if (randoms.contains(idx)) samples = e :: samples
-          idx += 1
-        }
-        samples
-      }
-
-    }
-    def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
-      val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + 
cur.score }
-      edgeWithScores.map { edgeWithScore =>
-        edgeWithScore.copy(score = edgeWithScore.score / sum)
-      }
-    }
-    def fetchInner(request: GetRequest) = {
-      storage.client.get(request) withCallback { kvs =>
-        val edgeWithScores = storage.toEdges(kvs.toSeq, 
queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
-
-        val normalized =
-          if (queryRequest.queryParam.shouldNormalize) 
normalize(edgeWithScores)
-          else edgeWithScores
-
-        val resultEdgesWithScores =
-          if (queryRequest.queryParam.sample >= 0 ) sample(normalized, 
queryRequest.queryParam.sample)
-          else normalized
-
-        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores))
-      } recoverWith { ex =>
-        logger.error(s"fetchQueryParam failed. fallback return.", ex)
-        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
-      }
-    }
-    def checkAndExpire(request: GetRequest,
-                       cacheKey: Long,
-                       cacheTTL: Long,
-                       cachedAt: Long,
-                       defer: Deferred[QueryRequestWithResult]): 
Deferred[QueryRequestWithResult] = {
-      if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
-        // future is too old. so need to expire and fetch new data from 
storage.
-        futureCache.asMap().remove(cacheKey)
-        val newPromise = new Deferred[QueryRequestWithResult]()
-        futureCache.asMap().putIfAbsent(cacheKey, (System.currentTimeMillis(), 
newPromise)) match {
-          case null =>
-            // only one thread succeed to come here concurrently
-            // initiate fetch to storage then add callback on complete to 
finish promise.
-            fetchInner(request) withCallback { queryRequestWithResult =>
-              newPromise.callback(queryRequestWithResult)
-              queryRequestWithResult
-            }
-            newPromise
-          case (cachedAt, oldDefer) => oldDefer
-        }
-      } else {
-        // future is not to old so reuse it.
-        defer
-      }
-    }
-
-    val queryParam = queryRequest.queryParam
-    val cacheTTL = queryParam.cacheTTLInMillis
-    val request = buildRequest(queryRequest)
-
-    if (cacheTTL <= 0) fetchInner(request)
-    else {
-      val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
-      val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-
-      val cacheVal = futureCache.getIfPresent(cacheKey)
-      cacheVal match {
-        case null =>
-          // here there is no promise set up for this cacheKey so we need to 
set promise on future cache.
-          val promise = new Deferred[QueryRequestWithResult]()
-          val now = System.currentTimeMillis()
-          val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, 
(now, promise)) match {
-            case null =>
-              fetchInner(request) withCallback { queryRequestWithResult =>
-                promise.callback(queryRequestWithResult)
-                queryRequestWithResult
-              }
-              (now, promise)
-            case oldVal => oldVal
-          }
-          checkAndExpire(request, cacheKey, cacheTTL, cachedAt, defer)
-        case (cachedAt, defer) =>
-          checkAndExpire(request, cacheKey, cacheTTL, cachedAt, defer)
-      }
-    }
-
-  }
-
-
-  override def toCacheKeyBytes(getRequest: GetRequest): Array[Byte] = {
-    var bytes = getRequest.key()
-    Option(getRequest.family()).foreach(family => bytes = Bytes.add(bytes, 
family))
-    Option(getRequest.qualifiers()).foreach {
-      qualifiers =>
-        qualifiers.filter(q => Option(q).isDefined).foreach {
-          qualifier =>
-            bytes = Bytes.add(bytes, qualifier)
-        }
-    }
-    //    if (getRequest.family() != null) bytes = Bytes.add(bytes, 
getRequest.family())
-    //    if (getRequest.qualifiers() != null) 
getRequest.qualifiers().filter(_ != null).foreach(q => bytes = Bytes.add(bytes, 
q))
-    bytes
-  }
-
-
-  override def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
-                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[QueryRequestWithResult]] = {
-    val defers: Seq[Deferred[QueryRequestWithResult]] = for {
-      (queryRequest, prevStepScore) <- queryRequestWithScoreLs
-      parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
-    } yield fetch(queryRequest, prevStepScore, isInnerCall = false, 
parentEdges)
-
-    val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = 
Deferred.group(defers)
-    grouped withCallback {
-      queryResults: util.ArrayList[QueryRequestWithResult] =>
-        queryResults.toIndexedSeq
-    } toFuture
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index a83aacd..7c05aed 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -1,17 +1,13 @@
 package com.kakao.s2graph.core.storage.hbase
 
-import java.util
 
-import com.google.common.cache.Cache
-import com.kakao.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
-import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException
 import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{Label, LabelMeta}
-import com.kakao.s2graph.core.storage.Storage
+import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.storage._
 import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.{Extensions, logger}
+import com.kakao.s2graph.core.utils.{FutureCache, DeferCache, Extensions, 
logger}
 import com.stumbleupon.async.Deferred
-import com.typesafe.config.Config
+import com.typesafe.config.{ConfigFactory, Config}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
@@ -19,20 +15,20 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.hadoop.security.UserGroupInformation
 import org.hbase.async._
 import scala.collection.JavaConversions._
-import scala.collection.Seq
+import scala.collection.{Map, Seq}
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future, duration}
-import scala.util.Random
 import scala.util.hashing.MurmurHash3
+import java.util
+
 
 
 object AsynchbaseStorage {
-  val vertexCf = HSerializable.vertexCf
-  val edgeCf = HSerializable.edgeCf
+  val vertexCf = Serializable.vertexCf
+  val edgeCf = Serializable.edgeCf
   val emptyKVs = new util.ArrayList[KeyValue]()
 
 
@@ -63,757 +59,245 @@ object AsynchbaseStorage {
   }
 }
 
-class AsynchbaseStorage(override val config: Config, vertexCache: 
Cache[Integer, Option[Vertex]])
-                       (implicit ec: ExecutionContext) extends Storage(config) 
{
-
-  import AsynchbaseStorage._
 
-  //  import Extensions.FutureOps
+class AsynchbaseStorage(override val config: Config)(implicit ec: 
ExecutionContext)
+  extends Storage[Deferred[QueryRequestWithResult]](config) {
 
   import Extensions.DeferOps
 
+  /**
+   * Asynchbase client setup.
+   * note that we need two client, one for bulk(withWait=false) and another 
for withWait=true
+   */
+  val configWithFlush = 
config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval"
 -> "0")))
   val client = AsynchbaseStorage.makeClient(config)
-  val queryBuilder = new AsynchbaseQueryBuilder(this)(ec)
-  val mutationBuilder = new AsynchbaseMutationBuilder(this)(ec)
-
-  //  val cacheOpt = Option(cache)
-  val cacheOpt = None
-  val vertexCacheOpt = Option(vertexCache)
 
   private val clientWithFlush = AsynchbaseStorage.makeClient(config, 
"hbase.rpcs.buffered_flush_interval" -> "0")
   private val clients = Seq(client, clientWithFlush)
-
   private val clientFlushInterval = 
config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
-  val MaxRetryNum = config.getInt("max.retry.number")
-  val MaxBackOff = config.getInt("max.back.off")
-  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
-  val FailProb = config.getDouble("hbase.fail.prob")
-  val LockExpireDuration = config.getInt("lock.expire.time")
-
-  /**
-    * Serializer/Deserializer
-    */
-  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = new 
SnapshotEdgeSerializable(snapshotEdge)
-
-  def indexEdgeSerializer(indexedEdge: IndexEdge) = new 
IndexEdgeSerializable(indexedEdge)
-
-  def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex)
-
-  val snapshotEdgeDeserializer = new SnapshotEdgeDeserializable
-  val indexEdgeDeserializer = new IndexEdgeDeserializable
-  val vertexDeserializer = new VertexDeserializable
-
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = 
queryBuilder.getEdges(q)
-
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): 
Future[Seq[QueryRequestWithResult]] = {
-    val futures = for {
-      (srcVertex, tgtVertex, queryParam) <- params
-    } yield queryBuilder.getEdge(srcVertex, tgtVertex, queryParam, 
false).toFuture
-
-    Future.sequence(futures)
-  }
-
-  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
-    def fromResult(queryParam: QueryParam,
-                   kvs: Seq[org.hbase.async.KeyValue],
-                   version: String): Option[Vertex] = {
-
-      if (kvs.isEmpty) None
-      else {
-        val newKVs = kvs
-        Option(vertexDeserializer.fromKeyValues(queryParam, newKVs, version, 
None))
-      }
-    }
-
-    val futures = vertices.map { vertex =>
-      val kvs = vertexSerializer(vertex).toKeyValues
-      val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, 
vertexCf)
-      //      get.setTimeout(this.singleGetTimeout.toShort)
-      get.setFailfast(true)
-      get.maxVersions(1)
-
-      val cacheKey = MurmurHash3.stringHash(get.toString)
-      val cacheVal = vertexCache.getIfPresent(cacheKey)
-      if (cacheVal == null)
-        client.get(get).toFutureWith(emptyKVs).map { kvs =>
-          fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion)
-        }
+  private val emptyKeyValues = new util.ArrayList[KeyValue]()
+  private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client
 
-      else Future.successful(cacheVal)
-    }
+  /** Future Cache to squash request */
+  private val futureCache = new DeferCache[QueryResult](config)(ec)
 
-    Future.sequence(futures).map { result => result.toList.flatten }
-  }
+  /** Simple Vertex Cache */
+  private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec)
 
 
-  def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] = {
-    val edgeFuture =
-      if (edge.op == GraphUtil.operations("deleteAll")) {
-        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
-      } else {
-        val strongConsistency = edge.label.consistencyLevel == "strong"
-        if (edge.op == GraphUtil.operations("delete") && !strongConsistency) {
-          val zkQuorum = edge.label.hbaseZkAddr
-          val (_, edgeUpdate) = Edge.buildDeleteBulk(None, edge)
-          val mutations =
-            mutationBuilder.indexedEdgeMutations(edgeUpdate) ++
-              mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++
-              mutationBuilder.increments(edgeUpdate)
-          writeAsyncSimple(zkQuorum, mutations, withWait)
-        } else {
-          mutateEdgesInner(Seq(edge), strongConsistency, 
withWait)(Edge.buildOperation)
+  /**
+   * fire rpcs into proper hbase cluster using client and
+   * return true on all mutation success. otherwise return false.
+   */
+  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean): Future[Boolean] = {
+    if (kvs.isEmpty) Future.successful(true)
+    else {
+      val _client = client(withWait)
+      val futures = kvs.map { kv =>
+        val _defer = kv.operation match {
+          case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, 
kv.cf, kv.qualifier, kv.value, kv.timestamp))
+          case SKeyValue.Delete =>
+            if (kv.qualifier == null) _client.delete(new 
DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp))
+            else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, 
kv.qualifier, kv.timestamp))
+          case SKeyValue.Increment =>
+            _client.atomicIncrement(new AtomicIncrementRequest(kv.table, 
kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)))
         }
-      }
-
-    val vertexFuture = writeAsyncSimple(edge.label.hbaseZkAddr,
-      mutationBuilder.buildVertexPutsAsync(edge), withWait)
+        val future = _defer.withCallback { ret => true }.recoverWith { ex =>
+          logger.error(s"mutation failed. $kv", ex)
+          false
+        }.toFuture
 
-    Future.sequence(Seq(edgeFuture, vertexFuture)).map(_.forall(identity))
-  }
-
-  override def mutateEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
-    val grouped = _edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
-
-    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
-      val (deleteAllEdges, edges) = edgeGroup.partition(_.op == 
GraphUtil.operations("deleteAll"))
-
-      // DeleteAll first
-      val deleteAllFutures = deleteAllEdges.map { edge =>
-        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
-      }
-
-      // After deleteAll, process others
-      lazy val mutateEdgeFutures = edges.toList match {
-        case head :: tail =>
-          val strongConsistency = edges.head.label.consistencyLevel == "strong"
-          if (strongConsistency) {
-            val edgeFuture = mutateEdgesInner(edges, strongConsistency, 
withWait)(Edge.buildOperation)
-
-            //TODO: decide what we will do on failure on vertex put
-            val puts = mutationBuilder.buildVertexPutsAsync(head)
-            val vertexFuture = writeAsyncSimple(head.label.hbaseZkAddr, puts, 
withWait)
-            Seq(edgeFuture, vertexFuture)
-          } else {
-            edges.map { edge => mutateEdge(edge, withWait = withWait) }
-          }
-        case Nil => Nil
+        if (withWait) future else Future.successful(true)
       }
 
-      val composed = for {
-        deleteRet <- Future.sequence(deleteAllFutures)
-        mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield deleteRet ++ mutateRet
-
-      composed.map(_.forall(identity))
+      Future.sequence(futures).map(_.forall(identity))
     }
-
-    Future.sequence(mutateEdges)
   }
 
-  def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = {
-    if (vertex.op == GraphUtil.operations("delete")) {
-      writeAsyncSimple(vertex.hbaseZkAddr, 
mutationBuilder.buildDeleteAsync(vertex), withWait)
-    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
-      logger.info(s"deleteAll for vertex is truncated. $vertex")
-      Future.successful(true) // Ignore withWait parameter, because deleteAll 
operation may takes long time
-    } else {
-      writeAsyncSimple(vertex.hbaseZkAddr, 
mutationBuilder.buildPutsAll(vertex), withWait)
-    }
-  }
 
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = {
-    val _client = if (withWait) clientWithFlush else client
-    val defers: Seq[Deferred[(Boolean, Long)]] = for {
-      edge <- edges
-    } yield {
-      val edgeWithIndex = edge.edgesWithIndex.head
-      val countWithTs = edge.propsWithTs(LabelMeta.countSeq)
-      val countVal = countWithTs.innerVal.toString().toLong
-      val incr = mutationBuilder.buildIncrementsCountAsync(edgeWithIndex, 
countVal).head
-      val request = incr.asInstanceOf[AtomicIncrementRequest]
-      val defer = _client.bufferAtomicIncrement(request) withCallback { 
resultCount: java.lang.Long =>
-        (true, resultCount.longValue())
-      } recoverWith { ex =>
-        logger.error(s"mutation failed. $request", ex)
-        (false, -1L)
-      }
-      if (withWait) defer
-      else Deferred.fromResult((true, -1L))
+  override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): 
Future[Seq[SKeyValue]] = {
+    val defer = fetchKeyValuesInner(hbaseRpc)
+    defer.toFuture.map { kvsArr =>
+      kvsArr.map { kv =>
+        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+      } toSeq
     }
-
-    val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = 
Deferred.groupInOrder(defers)
-    grouped.toFuture.map(_.toSeq)
   }
 
-  private def writeAsyncSimpleRetry(zkQuorum: String, elementRpcs: 
Seq[HBaseRpc], withWait: Boolean): Future[Boolean] = {
-    def compute = writeAsyncSimple(zkQuorum, elementRpcs, withWait).flatMap { 
ret =>
-      if (ret) Future.successful(ret)
-      else throw FetchTimeoutException("writeAsyncWithWaitRetrySimple")
-    }
-    Extensions.retryOnFailure(MaxRetryNum) {
-      compute
-    } {
-      logger.error(s"writeAsyncWithWaitRetrySimple: $elementRpcs")
-      false
-    }
+  /**
+   * since HBase natively provide CheckAndSet on storage level, implementation 
becomes simple.
+   * @param rpc: key value that is need to be stored on storage.
+   * @param expectedOpt: last valid value for rpc's KeyValue.value from 
fetching.
+   * @return return true if expected value matches and our rpc is successfully 
applied, otherwise false.
+   *         note that when some other thread modified same cell and have 
different value on this KeyValue,
+   *         then HBase atomically return false.
+   */
+  override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): 
Future[Boolean] = {
+    val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, 
rpc.value, rpc.timestamp)
+    val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
+    client(withWait = true).compareAndSet(put, expected).withCallback(ret => 
ret.booleanValue()).toFuture
   }
 
-  private def writeToStorage(_client: HBaseClient, rpc: HBaseRpc): 
Deferred[Boolean] = {
-    //    logger.debug(s"$rpc")
-    val defer = rpc match {
-      case d: DeleteRequest => _client.delete(d)
-      case p: PutRequest => _client.put(p)
-      case i: AtomicIncrementRequest => _client.bufferAtomicIncrement(i)
-    }
-    defer withCallback { ret => true } recoverWith { ex =>
-      logger.error(s"mutation failed. $rpc", ex)
-      false
-    }
-  }
 
-  private def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[HBaseRpc], 
withWait: Boolean): Future[Boolean] = {
-    val _client = if (withWait) clientWithFlush else client
-    if (elementRpcs.isEmpty) {
-      Future.successful(true)
-    } else {
-      val defers = elementRpcs.map { rpc => writeToStorage(_client, rpc) }
-      if (withWait)
-        Deferred.group(defers).toFuture map { arr => arr.forall(identity) }
-      else
-        Future.successful(true)
-    }
-  }
+  /**
+   * given queryRequest, build storage specific RPC Request.
+   * In HBase case, we either build Scanner or GetRequest.
+   *
+   * IndexEdge layer:
+   *    Tall schema(v4): use scanner.
+   *    Wide schema(label's schema version in v1, v2, v3): use GetRequest with 
columnRangeFilter
+   *                                                       when query is given 
with itnerval option.
+   * SnapshotEdge layer:
+   *    Tall schema(v3, v4): use GetRequest without column filter.
+   *    Wide schema(label's schema version in v1, v2): use GetRequest with 
columnRangeFilter.
+   * Vertex layer:
+   *    all version: use GetRequest without column filter.
+   * @param queryRequest
+   * @return Scanner or GetRequest with proper setup with StartKey, EndKey, 
RangeFilter.
+   */
+  override def buildRequest(queryRequest: QueryRequest): AnyRef = {
+    import Serializable._
+    val queryParam = queryRequest.queryParam
+    val label = queryParam.label
+    val edge = toRequestEdge(queryRequest)
 
-  private def writeAsync(zkQuorum: String, elementRpcs: Seq[Seq[HBaseRpc]], 
withWait: Boolean): Future[Seq[Boolean]] = {
-    val _client = if (withWait) clientWithFlush else client
-    if (elementRpcs.isEmpty) {
-      Future.successful(Seq.empty[Boolean])
+    val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+      val snapshotEdge = edge.toSnapshotEdge
+      snapshotEdgeSerializer(snapshotEdge).toKeyValues.head
+      //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
     } else {
-      val futures = elementRpcs.map { rpcs =>
-        val defers = rpcs.map { rpc => writeToStorage(_client, rpc) }
-        if (withWait)
-          Deferred.group(defers).toFuture map { arr => arr.forall(identity) }
-        else
-          Future.successful(true)
-      }
-      if (withWait)
-        Future.sequence(futures)
-      else
-        Future.successful(elementRpcs.map(_ => true))
-    }
-  }
-
-  private def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], 
Option[KeyValue])] = {
-    val labelWithDir = edge.labelWithDir
-    val queryParam = QueryParam(labelWithDir)
-    val _queryParam = 
queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
-    val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
-    val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
-
-    client.get(queryBuilder.buildRequest(queryRequest)) withCallback { kvs =>
-      val (edgeOpt, kvOpt) =
-        if (kvs.isEmpty()) (None, None)
-        else {
-          val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, 
parentEdges = Nil).headOption.map(_.edge)
-          val _kvOpt = kvs.headOption
-          (_edgeOpt, _kvOpt)
-        }
-      (queryParam, edgeOpt, kvOpt)
-    } recoverWith { ex =>
-      logger.error(s"fetchQueryParam failed. fallback return.", ex)
-      throw new FetchTimeoutException(s"${edge.toLogString}")
-    } toFuture
-  }
-
-
-  case class PartialFailureException(edge: Edge, statusCode: Byte, 
faileReason: String) extends Exception
-
-  def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = {
-    val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
-    logger.debug(msg)
-  }
-
-  def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, 
edgeMutate: EdgeMutate) = {
-    val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
-      s"${edgeMutate.toLogString}").mkString("\n")
-    logger.debug(msg)
-  }
+      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == 
queryParam.labelOrderSeq)
+      assert(indexedEdgeOpt.isDefined)
 
-  private def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: 
Option[KeyValue]) = {
-    val currentTs = System.currentTimeMillis()
-    val lockTs = snapshotEdgeOpt match {
-      case None => Option(currentTs)
-      case Some(snapshotEdge) =>
-        snapshotEdge.pendingEdgeOpt match {
-          case None => Option(currentTs)
-          case Some(pendingEdge) => pendingEdge.lockTs
-        }
+      val indexedEdge = indexedEdgeOpt.get
+      indexEdgeSerializer(indexedEdge).toKeyValues.head
     }
-    val newVersion = kvOpt.map(_.timestamp()).getOrElse(edge.ts) + 1
-    //      snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
-    val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = 
lockTs)
-    val base = snapshotEdgeOpt match {
-      case None =>
-        // no one ever mutated on this snapshotEdge.
-        edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
-      case Some(snapshotEdge) =>
-        // there is at least one mutation have been succeed.
-        snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = 
Option(pendingEdge))
-    }
-    base.copy(version = newVersion, statusCode = 1, lockTs = None)
-  }
 
-  private def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: 
SnapshotEdge,
-                                   edgeMutate: EdgeMutate) = {
-    val newVersion = lockEdge.version + 1
-    val base = edgeMutate.newSnapshotEdge match {
-      case None =>
-        // shouldReplace false
-        assert(snapshotEdgeOpt.isDefined)
-        snapshotEdgeOpt.get.toSnapshotEdge
-      case Some(newSnapshotEdge) => newSnapshotEdge
-    }
-    base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None)
-  }
+    val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
+    val get =
+      if (queryParam.tgtVertexInnerIdOpt.isDefined) new 
GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
+      else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
 
-  def mutate(predicate: Boolean,
-             edge: Edge,
-             statusCode: Byte,
-             _edgeMutate: EdgeMutate): Future[Boolean] = {
-    if (!predicate) throw new PartialFailureException(edge, 1, "predicate 
failed.")
+    get.maxVersions(1)
+    get.setFailfast(true)
+    get.setMaxResultsPerColumnFamily(queryParam.limit)
+    get.setRowOffsetPerColumnFamily(queryParam.offset)
+    get.setMinTimestamp(minTs)
+    get.setMaxTimestamp(maxTs)
+    get.setTimeout(queryParam.rpcTimeoutInMillis)
 
-    if (statusCode >= 2) {
-      logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p")
-      else
-        writeAsyncSimple(edge.label.hbaseZkAddr, 
mutationBuilder.indexedEdgeMutations(_edgeMutate), withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate)
-          } else {
-            throw new PartialFailureException(edge, 1, "hbase fail.")
-          }
-          true
-        }
-    }
-  }
+    if (queryParam.columnRangeFilter != null) 
get.setFilter(queryParam.columnRangeFilter)
 
-  def increment(predicate: Boolean,
-                edge: Edge,
-                statusCode: Byte, _edgeMutate: EdgeMutate): Future[Boolean] = {
-    if (!predicate) throw new PartialFailureException(edge, 2, "predicate 
failed.")
-    if (statusCode >= 3) {
-      logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p")
-      else
-        writeAsyncSimple(edge.label.hbaseZkAddr, 
mutationBuilder.increments(_edgeMutate), withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate)
-          } else {
-            throw new PartialFailureException(edge, 2, "hbase fail.")
-          }
-          true
-        }
-    }
+    get
   }
 
-  def acquireLock(statusCode: Byte, edge: Edge,
-                  lockEdge: SnapshotEdge, oldBytes: Array[Byte]): 
Future[Boolean] =
-    if (statusCode >= 1) {
-      logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p")
-      else {
-        val lockEdgePut = toPutRequest(lockEdge)
-        client.compareAndSet(lockEdgePut, oldBytes).toFuture.recoverWith {
-          case ex: Exception =>
-            logger.error(s"AcquireLock RPC Failed.")
-            throw new PartialFailureException(edge, 0, "AcquireLock RPC 
Failed")
-        }.map { ret =>
-          if (ret) {
-            val log = Seq(
-              "\n",
-              "=" * 50,
-              s"[Success]: acquireLock",
-              s"[RequestEdge]: ${edge.toLogString}",
-              s"[LockEdge]: ${lockEdge.toLogString()}",
-              s"[PendingEdge]: 
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
-              "=" * 50, "\n").mkString("\n")
-
-            logger.debug(log)
-            //            debug(ret, "acquireLock", edge.toSnapshotEdge)
-          } else {
-            throw new PartialFailureException(edge, 0, "hbase fail.")
-          }
-          true
-        }
-      }
-    }
+  /**
+   * we are using future cache to squash requests into same key on storage.
+   *
+   * @param queryRequest
+   * @param prevStepScore
+   * @param isInnerCall
+   * @param parentEdges
+   * @return we use Deferred here since it has much better performrance 
compared to scala.concurrent.Future.
+   *         seems like map, flatMap on scala.concurrent.Future is slower than 
Deferred's addCallback
+   */
+  override def fetch(queryRequest: QueryRequest,
+                     prevStepScore: Double,
+                     isInnerCall: Boolean,
+                     parentEdges: Seq[EdgeWithScore]): 
Deferred[QueryRequestWithResult] = {
+
+    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
+      fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
+        val edgeWithScores = toEdges(kvs, queryRequest.queryParam, 
prevStepScore, isInnerCall, parentEdges)
+        val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
+          sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        } else edgeWithScores
+        QueryResult(resultEdgesWithScores)
+        //        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
 
-  def releaseLock(predicate: Boolean,
-                  edge: Edge,
-                  lockEdge: SnapshotEdge,
-                  releaseLockEdge: SnapshotEdge,
-                  _edgeMutate: EdgeMutate,
-                  oldBytes: Array[Byte]): Future[Boolean] = {
-    if (!predicate) {
-      throw new PartialFailureException(edge, 3, "predicate failed.")
-    }
-    val p = Random.nextDouble()
-    if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p")
-    else {
-      val releaseLockEdgePut = toPutRequest(releaseLockEdge)
-      val lockEdgePut = toPutRequest(lockEdge)
-
-      client.compareAndSet(releaseLockEdgePut, 
lockEdgePut.value()).toFuture.recoverWith {
-        case ex: Exception =>
-          logger.error(s"ReleaseLock RPC Failed.")
-          throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed")
-      }.map { ret =>
-        if (ret) {
-          debug(ret, "releaseLock", edge.toSnapshotEdge)
-        } else {
-          val msg = Seq("\nFATAL ERROR\n",
-            "=" * 50,
-            oldBytes.toList,
-            lockEdgePut.value.toList,
-            releaseLockEdgePut.value().toList,
-            "=" * 50,
-            "\n"
-          )
-          logger.error(msg.mkString("\n"))
-          //          error(ret, "releaseLock", edge.toSnapshotEdge)
-          throw new PartialFailureException(edge, 3, "hbase fail.")
-        }
-        true
+      } recoverWith { ex =>
+        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+        QueryResult(isFailure = true)
+        //        QueryRequestWithResult(queryRequest, QueryResult(isFailure = 
true))
       }
     }
-    //      }
-  }
-
-  private def toPutRequest(snapshotEdge: SnapshotEdge): PutRequest = {
-    mutationBuilder.buildPutAsync(snapshotEdge).head.asInstanceOf[PutRequest]
-  }
 
+    val queryParam = queryRequest.queryParam
+    val cacheTTL = queryParam.cacheTTLInMillis
+    val request = buildRequest(queryRequest)
 
-  private def commitUpdate(edge: Edge,
-                           statusCode: Byte)(snapshotEdgeOpt: Option[Edge],
-                                             kvOpt: Option[KeyValue],
-                                             edgeUpdate: EdgeMutate): 
Future[Boolean] = {
-    val label = edge.label
-    def oldBytes = kvOpt.map(_.value()).getOrElse(Array.empty)
-    //    def oldBytes = snapshotEdgeOpt.map { e =>
-    //      snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head.value
-    //    }.getOrElse(Array.empty)
-    def process(lockEdge: SnapshotEdge,
-                releaseLockEdge: SnapshotEdge,
-                _edgeMutate: EdgeMutate,
-                statusCode: Byte): Future[Boolean] = {
-
-      for {
-        locked <- acquireLock(statusCode, edge, lockEdge, oldBytes)
-        mutated <- mutate(locked, edge, statusCode, _edgeMutate)
-        incremented <- increment(mutated, edge, statusCode, _edgeMutate)
-        released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, 
_edgeMutate, oldBytes)
-      } yield {
-        released
+    val defer =
+      if (cacheTTL <= 0) fetchInner(request)
+      else {
+        val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
+        val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+        futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
       }
-    }
-
-
-    val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt)
-    val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, 
edgeUpdate)
-    snapshotEdgeOpt match {
-      case None =>
-        // no one ever did success on acquire lock.
-        process(lockEdge, releaseLockEdge, edgeUpdate, statusCode)
-      case Some(snapshotEdge) =>
-        // someone did success on acquire lock at least one.
-        snapshotEdge.pendingEdgeOpt match {
-          case None =>
-            // not locked
-            process(lockEdge, releaseLockEdge, edgeUpdate, statusCode)
-          case Some(pendingEdge) =>
-            def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < 
System.currentTimeMillis()
-            if (isLockExpired) {
-              val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
-              val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(pendingEdge))
-              val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, 
kvOpt)
-              val _newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
newLockEdge, newEdgeUpdate)
-
-              // set lock ts as current ts
-              val newPendingEdgeOpt = 
_newReleaseLockEdge.pendingEdgeOpt.map(_.copy(lockTs = 
Option(System.currentTimeMillis())))
-              val newReleaseLockEdge = _newReleaseLockEdge.copy(pendingEdgeOpt 
= newPendingEdgeOpt)
-
-              process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, 
statusCode = 0).flatMap { ret =>
-                val log = s"[Success]: Resolving expired pending 
edge.\n${pendingEdge.toLogString}"
-                throw new PartialFailureException(edge, 0, log)
-              }
-            } else {
-              // locked
-              if (pendingEdge.ts == edge.ts && statusCode > 0) {
-                // self locked
-                val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
-                val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(edge))
-                val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
lockEdge, newEdgeUpdate)
-
-                /** lockEdge will be ignored */
-                process(lockEdge, newReleaseLockEdge, newEdgeUpdate, 
statusCode)
-              } else {
-                throw new PartialFailureException(edge, statusCode, 
s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]")
-              }
-            }
-        }
-    }
+    defer withCallback { queryResult => QueryRequestWithResult(queryRequest, 
queryResult)}
   }
 
-  private def mutateEdgesInner(edges: Seq[Edge],
-                               checkConsistency: Boolean,
-                               withWait: Boolean)(f: (Option[Edge], Seq[Edge]) 
=> (Edge, EdgeMutate)): Future[Boolean] = {
-    if (!checkConsistency) {
-      val zkQuorum = edges.head.label.hbaseZkAddr
-      val futures = edges.map { edge =>
-        val (_, edgeUpdate) = f(None, Seq(edge))
-        val mutations =
-          mutationBuilder.indexedEdgeMutations(edgeUpdate) ++
-            mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++
-            mutationBuilder.increments(edgeUpdate)
-        writeAsyncSimple(zkQuorum, mutations, withWait)
-      }
-      Future.sequence(futures).map { rets => rets.forall(identity) }
-    } else {
-      def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = {
-
-        fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
-
-          val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges)
-          //shouldReplace false.
-          if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) {
-            logger.debug(s"${newEdge.toLogString} drop.")
-            Future.successful(true)
-          } else {
-            commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, 
edgeUpdate).map { ret =>
-              if (ret) {
-                logger.info(s"[Success] commit: 
\n${_edges.map(_.toLogString).mkString("\n")}")
-              } else {
-                throw new PartialFailureException(newEdge, 3, "commit failed.")
-              }
-              true
-            }
-          }
-        }
-      }
-      def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: 
(Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = {
-        if (tryNum >= MaxRetryNum) {
-          edges.foreach { edge =>
-            logger.error(s"commit failed after 
$MaxRetryNum\n${edge.toLogString}")
-            ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = 
edge))
-          }
-          Future.successful(false)
-        } else {
-          val future = fn(edges, statusCode)
-          future.onSuccess {
-            case success =>
-              logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
-          }
-          future recoverWith {
-            case FetchTimeoutException(retryEdge) =>
-              logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
-              retry(tryNum + 1)(edges, statusCode)(fn)
-
-            case PartialFailureException(retryEdge, failedStatusCode, 
faileReason) =>
-              val status = failedStatusCode match {
-                case 0 => "AcquireLock failed."
-                case 1 => "Mutation failed."
-                case 2 => "Increment failed."
-                case 3 => "ReleaseLock failed."
-                case 4 => "Unknown"
-              }
-
-              Thread.sleep(Random.nextInt(MaxBackOff))
-              logger.info(s"[Try: $tryNum], [Status: $status] partial 
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
-              retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn)
-            case ex: Exception =>
-              logger.error("Unknown exception", ex)
-              Future.successful(false)
-          }
-        }
-      }
-      retry(1)(edges, 0)(commit)
-    }
-  }
 
+  override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, 
Double)],
+                       prevStepEdges: Predef.Map[VertexId, 
scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
+    val defers: Seq[Deferred[QueryRequestWithResult]] = for {
+      (queryRequest, prevStepScore) <- queryRequestWithScoreLs
+      parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
+    } yield fetch(queryRequest, prevStepScore, isInnerCall = false, 
parentEdges)
 
-  def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
-                newEdge: Edge, edgeMutate: EdgeMutate) = {
-    Seq("----------------------------------------------",
-      s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
-      s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
-      s"newEdge: ${newEdge.toLogString}",
-      s"mutation: \n${edgeMutate.toLogString}",
-      "----------------------------------------------").mkString("\n")
+    val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = 
Deferred.group(defers)
+    grouped withCallback {
+      queryResults: util.ArrayList[QueryRequestWithResult] =>
+        queryResults.toIndexedSeq
+    } toFuture
   }
 
-  private def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
-                                            queryResult: QueryResult,
-                                            requestTs: Long,
-                                            retryNum: Int): Future[Boolean] = {
-    val queryParam = queryRequest.queryParam
-    val zkQuorum = queryParam.label.hbaseZkAddr
-    val futures = for {
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield {
-      /** reverted direction */
-      val reversedIndexedEdgesMutations = 
edge.duplicateEdge.edgesWithIndex.flatMap { indexedEdge =>
-        mutationBuilder.buildDeletesAsync(indexedEdge) ++ 
mutationBuilder.buildIncrementsAsync(indexedEdge, -1L)
-      }
-      val reversedSnapshotEdgeMutations = 
mutationBuilder.buildDeleteAsync(edge.toSnapshotEdge)
-      val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { 
indexedEdge =>
-        mutationBuilder.buildDeletesAsync(indexedEdge) ++ 
mutationBuilder.buildIncrementsAsync(indexedEdge, -1L)
-      }
-      val mutations = reversedIndexedEdgesMutations ++ 
reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-      writeAsyncSimple(zkQuorum, mutations, withWait = true)
-    }
 
-    Future.sequence(futures).map { rets => rets.forall(identity) }
-  }
+  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = 
fetchSnapshotEdgeKeyValues(request)
 
-  private def buildEdgesToDelete(queryRequestWithResultLs: 
QueryRequestWithResult, requestTs: Long): QueryResult = {
-    val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResultLs).get
-    val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
-      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }.map { edgeWithScore =>
-      val label = queryRequest.queryParam.label
-      val newPropsWithTs = edgeWithScore.edge.propsWithTs ++
-        Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, 
requestTs, label.schemaVersion))
-      val copiedEdge = edgeWithScore.edge.copy(op = 
GraphUtil.operations("delete"), version = requestTs,
-        propsWithTs = newPropsWithTs)
-      edgeWithScore.copy(edge = copiedEdge)
-    }
-    queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
-  }
 
-  private def deleteAllFetchedEdgesLs(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
-    val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
-    queryResultLs.foreach { queryResult =>
-      if (queryResult.isFailure) throw new RuntimeException("fetched result is 
fallback.")
-    }
-
-    val futures = for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
-      if deleteQueryResult.edgeWithScoreLs.nonEmpty
+  /**
+   * when withWait is given, we use client with flushInterval set to 0.
+   * if we are not using this, then we are adding extra wait time as much as 
flushInterval in worst case.
+   *
+   * @param edges
+   * @param withWait
+   * @return
+   */
+  override def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = {
+    val _client = client(withWait)
+    val defers: Seq[Deferred[(Boolean, Long)]] = for {
+      edge <- edges
     } yield {
-      val label = queryRequest.queryParam.label
-      label.schemaVersion match {
-        case HBaseType.VERSION3 if label.consistencyLevel == "strong" =>
-
-          /**
-            * read: snapshotEdge on queryResult = O(N)
-            * write: N x (relatedEdges x indices(indexedEdge) + 
1(snapshotEdge))
-            */
-          mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait 
= true).map(_.forall(identity))
-        case _ =>
-
-          /**
-            * read: x
-            * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x 
indices)
-            */
-          deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, 
requestTs, MaxRetryNum)
+        val edgeWithIndex = edge.edgesWithIndex.head
+        val countWithTs = edge.propsWithTs(LabelMeta.countSeq)
+        val countVal = countWithTs.innerVal.toString().toLong
+        val incr = buildIncrementsCountAsync(edgeWithIndex, countVal).head
+        val request = incr.asInstanceOf[AtomicIncrementRequest]
+        _client.bufferAtomicIncrement(request) withCallback { resultCount: 
java.lang.Long =>
+          (true, resultCount.longValue())
+        } recoverWith { ex =>
+          logger.error(s"mutation failed. $request", ex)
+          (false, -1L)
+        }
       }
-    }
-    if (futures.isEmpty) {
-      // all deleted.
-      Future.successful(true -> true)
-    } else {
-      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
-    }
-  }
-
-  def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, 
Boolean)] = {
-    val future = for {
-      queryRequestWithResultLs <- getEdges(query)
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, 
requestTs)
-    } yield {
-      (allDeleted, ret)
-    }
-    Extensions.retryOnFailure(MaxRetryNum) {
-      future
-    } {
-      logger.error(s"fetch and deleteAll failed.")
-      (true, false)
-    }
 
+    val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = 
Deferred.groupInOrder(defers)
+    grouped.toFuture.map(_.toSeq)
   }
 
-  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
-                             labels: Seq[Label],
-                             dir: Int,
-                             ts: Long): Future[Boolean] = {
-
-    def enqueueLogMessage() = {
-      val kafkaMessages = for {
-        vertice <- srcVertices
-        id = vertice.innerId.toIdString()
-        label <- labels
-      } yield {
-        val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
-        val topic = ExceptionHandler.failTopic
-        val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, 
tsv))
-        kafkaMsg
-      }
-
-      ExceptionHandler.enqueues(kafkaMessages)
-    }
 
-    val requestTs = ts
-    val queryParams = for {
-      label <- labels
-    } yield {
-      val labelWithDir = LabelWithDirection(label.id.get, dir)
-      QueryParam(labelWithDir).limit(0, 
DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
-    }
-
-    val step = Step(queryParams.toList)
-    val q = Query(srcVertices, Vector(step))
-
-    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, 
Random.nextInt(MaxBackOff) + 1) {
-    val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
-      fetchAndDeleteAll(q, requestTs)
-    } { case (allDeleted, deleteSuccess) =>
-      allDeleted && deleteSuccess
-    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
-
-    retryFuture onFailure {
-      case ex =>
-        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
-        enqueueLogMessage()
-    }
-
-    retryFuture
-  }
-
-  def flush(): Unit = clients.foreach { client =>
+  override def flush(): Unit = clients.foreach { client =>
     val timeout = Duration((clientFlushInterval + 10) * 20, 
duration.MILLISECONDS)
     Await.result(client.flush().toFuture, timeout)
   }
 
 
-  def createTable(zkAddr: String,
-                  tableName: String,
-                  cfs: List[String],
-                  regionMultiplier: Int,
-                  ttl: Option[Int],
-                  compressionAlgorithm: String): Unit = {
+  override def createTable(zkAddr: String,
+                           tableName: String,
+                           cfs: List[String],
+                           regionMultiplier: Int,
+                           ttl: Option[Int],
+                           compressionAlgorithm: String): Unit = {
     logger.info(s"create table: $tableName on $zkAddr, $cfs, 
$regionMultiplier, $compressionAlgorithm")
     val admin = getAdmin(zkAddr)
     val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
@@ -847,6 +331,77 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     }
   }
 
+
+  /** Asynchbase implementation override default getVertices to use future 
Cache */
+  override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+    def fromResult(queryParam: QueryParam,
+                   kvs: Seq[SKeyValue],
+                   version: String): Option[Vertex] = {
+
+      if (kvs.isEmpty) None
+      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
+    }
+
+    val futures = vertices.map { vertex =>
+      val kvs = vertexSerializer(vertex).toKeyValues
+      val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, 
Serializable.vertexCf)
+      //      get.setTimeout(this.singleGetTimeout.toShort)
+      get.setFailfast(true)
+      get.maxVersions(1)
+
+      val cacheKey = MurmurHash3.stringHash(get.toString)
+      vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 
10000)(fetchVertexKeyValues(get)).map { kvs =>
+        fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion)
+      }
+    }
+
+    Future.sequence(futures).map { result => result.toList.flatten }
+  }
+
+
+
+
+
+  /**
+   * Private Methods which is specific to Asynchbase implementation.
+   */
+  private def fetchKeyValuesInner(rpc: AnyRef): 
Deferred[util.ArrayList[KeyValue]] = {
+    rpc match {
+      case getRequest: GetRequest => client.get(getRequest)
+      case scanner: Scanner =>
+        scanner.nextRows().withCallback { kvsLs =>
+          val ls = new util.ArrayList[KeyValue]
+          if (kvsLs == null) {
+
+          } else {
+            kvsLs.foreach { kvs =>
+              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
+              else {
+
+              }
+            }
+          }
+          scanner.close()
+          ls
+        }.recoverWith { ex =>
+          logger.error(s"fetchKeyValuesInner failed.", ex)
+          scanner.close()
+          emptyKeyValues
+        }
+      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues 
failed. $rpc"))
+    }
+  }
+
+  private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = {
+    hbaseRpc match {
+      case getRequest: GetRequest => getRequest.key()
+      case scanner: Scanner => scanner.getCurrentKey()
+      case _ =>
+        logger.error(s"toCacheKeyBytes failed. not supported class type. 
$hbaseRpc")
+        Array.empty[Byte]
+    }
+  }
+
   private def getSecureClusterAdmin(zkAddr: String) = {
     val jaas = config.getString("java.security.auth.login.config")
     val krb5Conf = config.getString("java.security.krb5.conf")
@@ -855,6 +410,7 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     val keytab = config.getString("keytab")
 
 
+
     System.setProperty("java.security.auth.login.config", jaas)
     System.setProperty("java.security.krb5.conf", krb5Conf)
     // System.setProperty("sun.security.krb5.debug", "true")
@@ -922,5 +478,4 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
   private def getEndKey(regionCount: Int): Array[Byte] = {
     Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala
deleted file mode 100644
index 25fe642..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.storage.{SKeyValue, StorageDeserializable}
-import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId, 
VertexId}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-trait HDeserializable[E] extends StorageDeserializable[E] {
-  import StorageDeserializable._
-
-  type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
-
-  /** version 1 and version 2 share same code for parsing row key part */
-  def parseRow(kv: SKeyValue, version: String): RowKeyRaw = {
-    var pos = 0
-    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
-    pos += srcIdLen
-    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-    pos += 4
-    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, 
pos)
-
-    val rowLen = srcIdLen + 4 + 1
-    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala
deleted file mode 100644
index cc21cba..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.storage.StorageSerializable
-
-object HSerializable {
-  val vertexCf = "v".getBytes()
-  val edgeCf = "e".getBytes()
-}
-
-trait HSerializable[E] extends StorageSerializable[E]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala
deleted file mode 100644
index 64ac1cf..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, 
SKeyValue}
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.logger
-import org.apache.hadoop.hbase.util.Bytes
-
-class IndexEdgeDeserializable extends HDeserializable[IndexEdge] {
-
-  import StorageDeserializable._
-
-  type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, 
Int)
-  type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
-  private def parseDegreeQualifier(kv: SKeyValue, version: String): 
QualifierRaw = {
-    val degree = Bytes.toLong(kv.value)
-    val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, 
version))
-    val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
-    (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-  }
-
-  private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-    var qualifierLen = 0
-    var pos = 0
-    val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-      val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
-      pos = endAt
-      qualifierLen += endAt
-      val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-        (HBaseType.defaultTgtVertexId, 0)
-      } else {
-        TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
version)
-      }
-      qualifierLen += tgtVertexIdLen
-      (props, endAt, tgtVertexId, tgtVertexIdLen)
-    }
-    val (op, opLen) =
-      if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-      else (kv.qualifier(qualifierLen), 1)
-
-    qualifierLen += opLen
-
-    (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-  }
-
-  private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-    val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
version)
-    (props, endAt)
-  }
-
-  private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-    (Array.empty[(Byte, InnerValLike)], 0)
-  }
-
-
-
-  /** version 1 and version 2 is same logic */
-  override def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, _kvs: 
Seq[T], version: String, cacheElementOpt: Option[IndexEdge] = None): IndexEdge 
= {
-    assert(_kvs.size == 1)
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { 
e =>
-      (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-    }.getOrElse(parseRow(kv, version))
-
-    val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
-      if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
-      else parseQualifier(kv, version)
-
-    val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
-      val countVal = Bytes.toLong(kv.value)
-      val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, 
version))
-      (dummyProps, 8)
-    } else if (kv.qualifier.isEmpty) {
-      parseDegreeValue(kv, version)
-    } else {
-      parseValue(kv, version)
-    }
-
-    val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException("invalid index seq"))
-
-
-    //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == 
idxPropsRaw.size)
-
-    val idxProps = for {
-      (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-    } yield {
-        if (k == LabelMeta.degreeSeq) k -> v
-        else seq -> v
-      }
-
-    val idxPropsMap = idxProps.toMap
-    val tgtVertexId = if (tgtVertexIdInQualifier) {
-      idxPropsMap.get(LabelMeta.toSeq) match {
-        case None => tgtVertexIdRaw
-        case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
-      }
-    } else tgtVertexIdRaw
-
-    val _mergedProps = (idxProps ++ props).toMap
-    val mergedProps =
-      if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-      else _mergedProps + (LabelMeta.timeStampSeq -> 
InnerVal.withLong(kv.timestamp, version))
-
-//    logger.error(s"$mergedProps")
-//    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
-
-    val ts = kv.timestamp
-    IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, 
op, ts, labelIdxSeq, mergedProps)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala
deleted file mode 100644
index ceba4b9..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue}
-import com.kakao.s2graph.core.types.VertexId
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
-import org.apache.hadoop.hbase.util.Bytes
-
-case class IndexEdgeSerializable(indexEdge: IndexEdge) extends 
HSerializable[IndexEdge] {
-
-  import StorageSerializable._
-
-  val label = indexEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = HSerializable.edgeCf
-
-  val idxPropsMap = indexEdge.orders.toMap
-  val idxPropsBytes = propsToBytes(indexEdge.orders)
-
-  /** version 1 and version 2 share same code for serialize row key part */
-  override def toKeyValues: Seq[SKeyValue] = {
-    val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-    val labelWithDirBytes = indexEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
-    val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-//    
logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
-    val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
-    val qualifier =
-      if (indexEdge.op == GraphUtil.operations("incrementCount")) {
-        Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
-      } else {
-        idxPropsMap.get(LabelMeta.toSeq) match {
-          case None => Bytes.add(idxPropsBytes, tgtIdBytes)
-          case Some(vId) => idxPropsBytes
-        }
-      }
-
-    val value = propsToKeyValues(indexEdge.metas.toSeq)
-    val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
-
-    Seq(kv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala
deleted file mode 100644
index 97f63e4..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
-import com.kakao.s2graph.core.storage.{CanSKeyValue, SKeyValue, 
StorageDeserializable}
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-class SnapshotEdgeDeserializable extends HDeserializable[SnapshotEdge] {
-
-  import StorageDeserializable._
-
-  override def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, _kvs: 
Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = 
{
-    queryParam.label.schemaVersion match {
-      case HBaseType.VERSION3 => fromKeyValuesInnerV3(queryParam, _kvs, 
version, cacheElementOpt)
-      case _ => fromKeyValuesInner(queryParam, _kvs, version, cacheElementOpt)
-    }
-  }
-
-  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
-    val statusCode = byte >> 4
-    val op = byte & ((1 << 4) - 1)
-    (statusCode.toByte, op.toByte)
-  }
-
-  private def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, 
_kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): 
SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
-
-    val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
-    val cellVersion = kv.timestamp
-
-    val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRow(kv, schemaVer))
-
-    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
-      val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, 
kv.qualifier.length, schemaVer)
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-      val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
-          val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
-          Option(pendingEdge)
-        }
-
-      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
-    }
-
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
-  }
-
-  private def fromKeyValuesInnerV3[T: CanSKeyValue](queryParam: QueryParam, 
_kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): 
SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
-
-    val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
-    val cellVersion = kv.timestamp
-    /** rowKey */
-    def parseRowV3(kv: SKeyValue, version: String) = {
-      var pos = 0
-      val (srcIdAndTgtId, srcIdAndTgtIdLen) = 
SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
-      pos += srcIdAndTgtIdLen
-      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-      pos += 4
-      val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-
-      val rowLen = srcIdAndTgtIdLen + 4 + 1
-      (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, 
labelIdxSeq, isInverted, rowLen)
-
-    }
-    val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map 
{ e =>
-      (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, 
LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRowV3(kv, schemaVer))
-
-    val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
-    val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
-
-    val (props, op, ts, statusCode, _pendingEdgeOpt) = {
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-      val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
-          val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
-          Option(pendingEdge)
-        }
-
-      (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
-    }
-
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala
deleted file mode 100644
index a9e77dc..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import java.util.UUID
-
-import com.kakao.s2graph.core.SnapshotEdge
-import com.kakao.s2graph.core.mysqls.LabelIndex
-import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue}
-import com.kakao.s2graph.core.types.{HBaseType, SourceAndTargetVertexIdPair, 
VertexId}
-import com.kakao.s2graph.core.utils.logger
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.util.Random
-
-class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends 
HSerializable[SnapshotEdge] {
-  import StorageSerializable._
-
-  val label = snapshotEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = HSerializable.edgeCf
-
-  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
-    val byte = (((statusCode << 4) | op).toByte)
-    Array.fill(1)(byte.toByte)
-  }
-  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, 
snapshotEdge.op),
-    propsToKeyValuesWithTs(snapshotEdge.props.toList))
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    label.schemaVersion match {
-      case HBaseType.VERSION3 => toKeyValuesInnerV3
-      case _ => toKeyValuesInner
-    }
-  }
-
-  private def toKeyValuesInner: Seq[SKeyValue] = {
-    val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
-    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
-    val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-    val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
-
-    val qualifier = tgtIdBytes
-
-    val value = snapshotEdge.pendingEdgeOpt match {
-      case None => valueBytes()
-      case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
-        val versionBytes = Array.empty[Byte]
-//          Bytes.toBytes(snapshotEdge.version)
-        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
-        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-//          Array.empty[Byte]
-//          snapshotEdge.lockedAtOpt.map(lockedAt => 
Bytes.toBytes(lockedAt)).getOrElse(Array.empty[Byte])
-        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
-    }
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
-
-  private def toKeyValuesInnerV3: Seq[SKeyValue] = {
-    val srcIdAndTgtIdBytes = 
SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, 
snapshotEdge.tgtVertex.innerId).bytes
-    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
-    val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-
-    val qualifier = Array.empty[Byte]
-
-    val value = snapshotEdge.pendingEdgeOpt match {
-      case None => valueBytes()
-      case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
-        val versionBytes = Array.empty[Byte]
-//          Bytes.toBytes(snapshotEdge.version)
-        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
-        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-//          Array.empty[Byte]
-//          snapshotEdge.lockedAtOpt.map(lockedAt => 
Bytes.toBytes(lockedAt)).getOrElse(Array.empty[Byte])
-//        logger.error(s"ValueBytes: ${valueBytes().toList}")
-//        logger.error(s"opBytes: ${opBytes.toList}")
-//        logger.error(s"versionBytes: ${versionBytes.toList}")
-//        logger.error(s"PropsBytes: ${propsBytes.toList}")
-//        logger.error(s"LockBytes: ${lockBytes.toList}")
-        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
-    }
-
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala
deleted file mode 100644
index 4c0ca19..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.storage.CanSKeyValue
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
-import com.kakao.s2graph.core.{QueryParam, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.collection.mutable.ListBuffer
-
-class VertexDeserializable extends HDeserializable[Vertex] {
-  def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam,
-                                     _kvs: Seq[T],
-                                     version: String,
-                                     cacheElementOpt: Option[Vertex]): Vertex 
= {
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
-
-    var maxTs = Long.MinValue
-    val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
-    val belongLabelIds = new ListBuffer[Int]
-
-    for {
-      kv <- kvs
-    } {
-      val propKey =
-        if (kv.qualifier.length == 1) kv.qualifier.head.toInt
-        else Bytes.toInt(kv.qualifier)
-
-      val ts = kv.timestamp
-      if (ts > maxTs) maxTs = ts
-
-      if (Vertex.isLabelId(propKey)) {
-        belongLabelIds += Vertex.toLabelId(propKey)
-      } else {
-        val v = kv.value
-        val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
-        propsMap += (propKey -> value)
-      }
-    }
-    assert(maxTs != Long.MinValue)
-    Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala
deleted file mode 100644
index 370b844..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-import com.kakao.s2graph.core.Vertex
-import com.kakao.s2graph.core.storage.SKeyValue
-import org.apache.hadoop.hbase.util.Bytes
-
-case class VertexSerializable(vertex: Vertex) extends HSerializable[Vertex] {
-
-  val cf = HSerializable.vertexCf
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    val row = vertex.id.bytes
-    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield 
Bytes.toBytes(k) -> v.bytes
-    val belongsTo = vertex.belongLabelIds.map { labelId => 
Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
-    (base ++ belongsTo).map { case (qualifier, value) =>
-      SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, 
vertex.ts)
-    } toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala
new file mode 100644
index 0000000..6777c28
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala
@@ -0,0 +1,82 @@
+package com.kakao.s2graph.core.utils
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.CacheBuilder
+import com.stumbleupon.async.Deferred
+import com.typesafe.config.Config
+
+import scala.concurrent.ExecutionContext
+
+class DeferCache[R](config: Config)(implicit ex: ExecutionContext) {
+
+  import com.kakao.s2graph.core.utils.Extensions.DeferOps
+
+  type Value = (Long, Deferred[R])
+
+  private val maxSize = config.getInt("future.cache.max.size")
+  private val expireAfterWrite = 
config.getInt("future.cache.expire.after.write")
+  private val expireAfterAccess = 
config.getInt("future.cache.expire.after.access")
+
+  private val futureCache = CacheBuilder.newBuilder()
+  .initialCapacity(maxSize)
+  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
+  .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
+  .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
+  .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]()
+
+
+  def asMap() = futureCache.asMap()
+
+  def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey)
+
+  private def checkAndExpire(cacheKey: Long,
+                             cachedAt: Long,
+                             cacheTTL: Long,
+                             oldDefer: Deferred[R])(op: => Deferred[R]): 
Deferred[R] = {
+    if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
+      // future is too old. so need to expire and fetch new data from storage.
+      futureCache.asMap().remove(cacheKey)
+
+      val newPromise = new Deferred[R]()
+      val now = System.currentTimeMillis()
+
+      futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
+        case null =>
+          // only one thread succeed to come here concurrently
+          // initiate fetch to storage then add callback on complete to finish 
promise.
+          op withCallback { value =>
+            newPromise.callback(value)
+            value
+          }
+          newPromise
+        case (cachedAt, oldDefer) => oldDefer
+      }
+    } else {
+      // future is not to old so reuse it.
+      oldDefer
+    }
+  }
+  def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): 
Deferred[R] = {
+    val cacheVal = futureCache.getIfPresent(cacheKey)
+    cacheVal match {
+      case null =>
+        val promise = new Deferred[R]()
+        val now = System.currentTimeMillis()
+        val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, 
(now, promise)) match {
+          case null =>
+            op.withCallback { value =>
+              promise.callback(value)
+              value
+            }
+            (now, promise)
+          case oldVal => oldVal
+        }
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
+
+      case (cachedAt, defer) =>
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
index 4858f60..eea9a79 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
@@ -2,7 +2,6 @@ package com.kakao.s2graph.core.utils
 
 import com.stumbleupon.async.{Callback, Deferred}
 import com.typesafe.config.Config
-
 import scala.concurrent.{ExecutionContext, Future, Promise}
 
 object Extensions {
@@ -70,5 +69,4 @@ object Extensions {
     def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean =
       if (config.hasPath(key)) config.getBoolean(key) else defaultValue
   }
-
 }

Reply via email to