http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
new file mode 100644
index 0000000..34e9fcb
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -0,0 +1,1224 @@
+package org.apache.s2graph.core.storage
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val}
+import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import 
org.apache.s2graph.core.storage.serde.indexedge.wide.{IndexEdgeDeserializable, 
IndexEdgeSerializable}
+import 
org.apache.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable
+import org.apache.s2graph.core.storage.serde.vertex.{VertexDeserializable, 
VertexSerializable}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.{Extensions, logger}
+
+import scala.annotation.tailrec
+import scala.collection.Seq
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Random, Try}
+
+abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
+  import HBaseType._
+
+  /** storage dependent configurations */
+  val MaxRetryNum = config.getInt("max.retry.number")
+  val MaxBackOff = config.getInt("max.back.off")
+  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
+  val FailProb = config.getDouble("hbase.fail.prob")
+  val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000)
+  val maxSize = config.getInt("future.cache.max.size")
+  val expireAfterWrite = config.getInt("future.cache.expire.after.write")
+  val expireAfterAccess = config.getInt("future.cache.expire.after.access")
+
+  /**
+   * Compatibility table
+   * | label schema version | snapshot edge | index edge | vertex | note |
+   * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do 
not use this. this exist only for backward compatibility issue |
+   * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do 
not use this. this exist only for backward compatibility issue |
+   * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | 
recommended with HBase. current stable schema |
+   * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | 
experimental schema. use scanner instead of get |
+   *
+   */
+
+  /**
+   * create serializer that knows how to convert given snapshotEdge into kvs: 
Seq[SKeyValue]
+   * so we can store this kvs.
+   * @param snapshotEdge: snapshotEdge to serialize
+   * @return serializer implementation for StorageSerializable which has 
toKeyValues return Seq[SKeyValue]
+   */
+  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): 
Serializable[SnapshotEdge] = {
+    snapshotEdge.schemaVer match {
+      case VERSION1 | VERSION2 => new 
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
+      case VERSION3 | VERSION4 => new 
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
+      case _ => throw new RuntimeException(s"not supported version: 
${snapshotEdge.schemaVer}")
+    }
+  }
+
+  /**
+   * create serializer that knows how to convert given indexEdge into kvs: 
Seq[SKeyValue]
+   * @param indexEdge: indexEdge to serialize
+   * @return serializer implementation
+   */
+  def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = {
+    indexEdge.schemaVer match {
+      case VERSION1 | VERSION2 | VERSION3 => new 
IndexEdgeSerializable(indexEdge)
+      case VERSION4 => new 
serde.indexedge.tall.IndexEdgeSerializable(indexEdge)
+      case _ => throw new RuntimeException(s"not supported version: 
${indexEdge.schemaVer}")
+
+    }
+  }
+
+  /**
+   * create serializer that knows how to convert given vertex into kvs: 
Seq[SKeyValue]
+   * @param vertex: vertex to serialize
+   * @return serializer implementation
+   */
+  def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex)
+
+  /**
+   * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
+   * note that each storage implementation should implement implicit type class
+   * to convert storage dependent dataType into common SKeyValue type by 
implementing CanSKeyValue
+   *
+   * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has 
implicit type conversion method.
+   * if any storaage use different class to represent stored byte array,
+   * then that storage implementation is responsible to provide implicit type 
conversion method on CanSKeyValue.
+   * */
+
+  val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = 
Map(
+    VERSION1 -> new SnapshotEdgeDeserializable,
+    VERSION2 -> new SnapshotEdgeDeserializable,
+    VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable,
+    VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable
+  )
+  def snapshotEdgeDeserializer(schemaVer: String) =
+    snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
+
+  /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
+  val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map(
+    VERSION1 -> new IndexEdgeDeserializable,
+    VERSION2 -> new IndexEdgeDeserializable,
+    VERSION3 -> new IndexEdgeDeserializable,
+    VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable
+  )
+
+  def indexEdgeDeserializer(schemaVer: String) =
+    indexEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
+
+  /** create deserializer that can parser stored CanSKeyValue into vertex. */
+  val vertexDeserializer = new VertexDeserializable
+
+
+  /**
+   * decide how to store given key values Seq[SKeyValue] into storage using 
storage's client.
+   * note that this should be return true on all success.
+   * we assumes that each storage implementation has client as member variable.
+   *
+   *
+   * @param cluster: where this key values should be stored.
+   * @param kvs: sequence of SKeyValue that need to be stored in storage.
+   * @param withWait: flag to control wait ack from storage.
+   *                  note that in AsynchbaseStorage(which support 
asynchronous operations), even with true,
+   *                  it never block thread, but rather submit work and 
notified by event loop when storage send ack back.
+   * @return ack message from storage.
+   */
+  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): 
Future[Boolean]
+
+//  def writeToStorage(kv: SKeyValue, withWait: Boolean): Future[Boolean]
+
+  /**
+   * fetch SnapshotEdge for given request from storage.
+   * also storage datatype should be converted into SKeyValue.
+   * note that return type is Sequence rather than single SKeyValue for 
simplicity,
+   * even though there is assertions sequence.length == 1.
+   * @param request
+   * @return
+   */
+  def fetchSnapshotEdgeKeyValues(request: AnyRef): Future[Seq[SKeyValue]]
+
+  /**
+   * write requestKeyValue into storage if the current value in storage that 
is stored matches.
+   * note that we only use SnapshotEdge as place for lock, so this method only 
change SnapshotEdge.
+   *
+   * Most important thing is this have to be 'atomic' operation.
+   * When this operation is mutating requestKeyValue's snapshotEdge, then 
other thread need to be
+   * either blocked or failed on write-write conflict case.
+   *
+   * Also while this method is still running, then fetchSnapshotEdgeKeyValues 
should be synchronized to
+   * prevent wrong data for read.
+   *
+   * Best is use storage's concurrency control(either pessimistic or 
optimistic) such as transaction,
+   * compareAndSet to synchronize.
+   *
+   * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation 
to guarantee 'atomicity'.
+   * for storage that does not support concurrency control, then storage 
implementation
+   * itself can maintain manual locks that synchronize 
read(fetchSnapshotEdgeKeyValues)
+   * and write(writeLock).
+   * @param requestKeyValue
+   * @param expectedOpt
+   * @return
+   */
+  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue]): 
Future[Boolean]
+
+  /**
+   * build proper request which is specific into storage to call 
fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
+   * for example, Asynchbase use GetRequest, Scanner so this method is 
responsible to build
+   * client request(GetRequest, Scanner) based on user provided query.
+   * @param queryRequest
+   * @return
+   */
+  def buildRequest(queryRequest: QueryRequest): AnyRef
+
+  /**
+   * fetch IndexEdges for given queryParam in queryRequest.
+   * this expect previous step starting score to propagate score into next 
step.
+   * also parentEdges is necessary to return full bfs tree when query require 
it.
+   *
+   * note that return type is general type.
+   * for example, currently we wanted to use Asynchbase
+   * so single I/O return type should be Deferred[T].
+   *
+   * if we use native hbase client, then this return type can be Future[T] or 
just T.
+   * @param queryRequest
+   * @param prevStepScore
+   * @param isInnerCall
+   * @param parentEdges
+   * @return
+   */
+  def fetch(queryRequest: QueryRequest,
+            prevStepScore: Double,
+            isInnerCall: Boolean,
+            parentEdges: Seq[EdgeWithScore]): R
+
+  /**
+   * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
+   * @param queryRequestWithScoreLs
+   * @param prevStepEdges
+   * @return
+   */
+  def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[QueryRequestWithResult]]
+
+  /**
+   * fetch Vertex for given request from storage.
+   * @param request
+   * @return
+   */
+  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]]
+
+  /**
+   * decide how to apply given edges(indexProps values + Map(_count -> 
countVal)) into storage.
+   * @param edges
+   * @param withWait
+   * @return
+   */
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]]
+
+  /**
+   * this method need to be called when client shutdown. this is responsible 
to cleanUp the resources
+   * such as client into storage.
+   */
+  def flush(): Unit
+
+  /**
+   * create table on storage.
+   * if storage implementation does not support namespace or table, then there 
is nothing to be done
+   * @param zkAddr
+   * @param tableName
+   * @param cfs
+   * @param regionMultiplier
+   * @param ttl
+   * @param compressionAlgorithm
+   */
+  def createTable(zkAddr: String,
+                  tableName: String,
+                  cfs: List[String],
+                  regionMultiplier: Int,
+                  ttl: Option[Int],
+                  compressionAlgorithm: String): Unit
+
+
+
+
+
+  /** Public Interface */
+
+  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+    def fromResult(queryParam: QueryParam,
+                   kvs: Seq[SKeyValue],
+                   version: String): Option[Vertex] = {
+      if (kvs.isEmpty) None
+      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
+    }
+
+    val futures = vertices.map { vertex =>
+      val queryParam = QueryParam.Empty
+      val q = Query.toQuery(Seq(vertex), queryParam)
+      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+      fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs =>
+        fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion)
+      } recoverWith { case ex: Throwable =>
+        Future.successful(None)
+      }
+    }
+
+    Future.sequence(futures).map { result => result.toList.flatten }
+  }
+
+  def mutateElements(elements: Seq[GraphElement],
+                     withWait: Boolean = false): Future[Seq[Boolean]] = {
+
+    val edgeBuffer = ArrayBuffer[Edge]()
+    val vertexBuffer = ArrayBuffer[Vertex]()
+
+    elements.foreach {
+      case e: Edge => edgeBuffer += e
+      case v: Vertex => vertexBuffer += v
+      case any@_ => logger.error(s"Unknown type: ${any}")
+    }
+
+    val edgeFuture = mutateEdges(edgeBuffer, withWait)
+    val vertexFuture = mutateVertices(vertexBuffer, withWait)
+
+    val graphFuture = for {
+      edgesMutated <- edgeFuture
+      verticesMutated <- vertexFuture
+    } yield edgesMutated ++ verticesMutated
+
+    graphFuture
+  }
+
+  def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = 
{
+    val (strongEdges, weakEdges) =
+      (edges.partition(e => e.label.consistencyLevel == "strong" || e.op == 
GraphUtil.operations("insertBulk")))
+
+    val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map 
{ case (zkQuorum, edges) =>
+      val mutations = edges.flatMap { edge =>
+        val (_, edgeUpdate) =
+          if (edge.op == GraphUtil.operations("delete")) 
Edge.buildDeleteBulk(None, edge)
+          else Edge.buildOperation(None, Seq(edge))
+        buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++
+          snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
+      }
+      writeToStorage(zkQuorum, mutations, withWait)
+    }
+    val strongEdgesFutures = mutateStrongEdges(strongEdges, withWait)
+    for {
+      weak <- Future.sequence(weakEdgesFutures)
+      strong <- strongEdgesFutures
+    } yield {
+      strong ++ weak
+    }
+  }
+  def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
+
+    val grouped = _edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
+
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val (deleteAllEdges, edges) = edgeGroup.partition(_.op == 
GraphUtil.operations("deleteAll"))
+
+      // DeleteAll first
+      val deleteAllFutures = deleteAllEdges.map { edge =>
+        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
+      }
+
+      // After deleteAll, process others
+      lazy val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          //          val strongConsistency = 
edges.head.label.consistencyLevel == "strong"
+          //          if (strongConsistency) {
+          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , 
withWait)(Edge.buildOperation)
+
+          //TODO: decide what we will do on failure on vertex put
+          val puts = buildVertexPutsAsync(head)
+          val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, 
withWait)
+          Seq(edgeFuture, vertexFuture)
+        //          } else {
+        //            edges.map { edge => mutateEdge(edge, withWait = 
withWait) }
+        //          }
+        case Nil => Nil
+      }
+
+      val composed = for {
+        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield deleteRet ++ mutateRet
+
+      composed.map(_.forall(identity))
+    }
+
+    Future.sequence(mutateEdges)
+  }
+
+  def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = {
+    if (vertex.op == GraphUtil.operations("delete")) {
+      writeToStorage(vertex.hbaseZkAddr,
+        vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)), withWait)
+    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+      logger.info(s"deleteAll for vertex is truncated. $vertex")
+      Future.successful(true) // Ignore withWait parameter, because deleteAll 
operation may takes long time
+    } else {
+      writeToStorage(vertex.hbaseZkAddr, buildPutsAll(vertex), withWait)
+    }
+  }
+
+  def mutateVertices(vertices: Seq[Vertex],
+                     withWait: Boolean = false): Future[Seq[Boolean]] = {
+    val futures = vertices.map { vertex => mutateVertex(vertex, withWait) }
+    Future.sequence(futures)
+  }
+
+
+  def mutateEdgesInner(edges: Seq[Edge],
+                       checkConsistency: Boolean,
+                       withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => 
(Edge, EdgeMutate)): Future[Boolean] = {
+    if (!checkConsistency) {
+      val zkQuorum = edges.head.label.hbaseZkAddr
+      val futures = edges.map { edge =>
+        val (_, edgeUpdate) = f(None, Seq(edge))
+        val mutations =
+          indexedEdgeMutations(edgeUpdate) ++
+            snapshotEdgeMutations(edgeUpdate) ++
+            increments(edgeUpdate)
+        writeToStorage(zkQuorum, mutations, withWait)
+      }
+      Future.sequence(futures).map { rets => rets.forall(identity) }
+    } else {
+      def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = {
+
+        fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+
+          val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges)
+          logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}")
+          //shouldReplace false.
+          if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) {
+            logger.debug(s"${newEdge.toLogString} drop.")
+            Future.successful(true)
+          } else {
+            commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, 
edgeUpdate).map { ret =>
+              if (ret) {
+                logger.info(s"[Success] commit: 
\n${_edges.map(_.toLogString).mkString("\n")}")
+              } else {
+                throw new PartialFailureException(newEdge, 3, "commit failed.")
+              }
+              true
+            }
+          }
+        }
+      }
+      def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: 
(Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = {
+        if (tryNum >= MaxRetryNum) {
+          edges.foreach { edge =>
+            logger.error(s"commit failed after 
$MaxRetryNum\n${edge.toLogString}")
+            ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = 
edge))
+          }
+          Future.successful(false)
+        } else {
+          val future = fn(edges, statusCode)
+          future.onSuccess {
+            case success =>
+              logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
+          }
+          future recoverWith {
+            case FetchTimeoutException(retryEdge) =>
+              logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
+              retry(tryNum + 1)(edges, statusCode)(fn)
+
+            case PartialFailureException(retryEdge, failedStatusCode, 
faileReason) =>
+              val status = failedStatusCode match {
+                case 0 => "AcquireLock failed."
+                case 1 => "Mutation failed."
+                case 2 => "Increment failed."
+                case 3 => "ReleaseLock failed."
+                case 4 => "Unknown"
+              }
+
+              Thread.sleep(Random.nextInt(MaxBackOff))
+              logger.info(s"[Try: $tryNum], [Status: $status] partial 
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
+              retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn)
+            case ex: Exception =>
+              logger.error("Unknown exception", ex)
+              Future.successful(false)
+          }
+        }
+      }
+      retry(1)(edges, 0)(commit)
+    }
+  }
+
+  def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
+                newEdge: Edge, edgeMutate: EdgeMutate) =
+    Seq("----------------------------------------------",
+      s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
+      s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
+      s"newEdge: ${newEdge.toLogString}",
+      s"mutation: \n${edgeMutate.toLogString}",
+      "----------------------------------------------").mkString("\n")
+
+
+  /** Delete All */
+  protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
+                                              queryResult: QueryResult,
+                                              requestTs: Long,
+                                              retryNum: Int): Future[Boolean] 
= {
+    val queryParam = queryRequest.queryParam
+    val zkQuorum = queryParam.label.hbaseZkAddr
+    val futures = for {
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+    } yield {
+        /** reverted direction */
+        val reversedIndexedEdgesMutations = 
edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
+            buildIncrementsAsync(indexEdge, -1L)
+        }
+        val reversedSnapshotEdgeMutations = 
snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
+        val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { 
indexEdge =>
+          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
+            buildIncrementsAsync(indexEdge, -1L)
+        }
+        val mutations = reversedIndexedEdgesMutations ++ 
reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+        writeToStorage(zkQuorum, mutations, withWait = true)
+      }
+
+    Future.sequence(futures).map { rets => rets.forall(identity) }
+  }
+
+  protected def buildEdgesToDelete(queryRequestWithResultLs: 
QueryRequestWithResult, requestTs: Long): QueryResult = {
+    val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResultLs).get
+    val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
+      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+    }.map { edgeWithScore =>
+      val label = queryRequest.queryParam.label
+      val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
+        case "strong" =>
+          val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
+            Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+          (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
+        case _ =>
+          val oldEdge = edgeWithScore.edge
+          (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+      }
+
+      val copiedEdge =
+        edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs 
= newPropsWithTs)
+
+      val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
+//      logger.debug(s"delete edge from deleteAll: 
${edgeToDelete.edge.toLogString}")
+      edgeToDelete
+    }
+
+    queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
+  }
+
+  protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
+    val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
+    queryResultLs.foreach { queryResult =>
+      if (queryResult.isFailure) throw new RuntimeException("fetched result is 
fallback.")
+    }
+    val futures = for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
+      if deleteQueryResult.edgeWithScoreLs.nonEmpty
+    } yield {
+        val label = queryRequest.queryParam.label
+        label.schemaVersion match {
+          case HBaseType.VERSION3 | HBaseType.VERSION4 =>
+            if (label.consistencyLevel == "strong") {
+              /**
+               * read: snapshotEdge on queryResult = O(N)
+               * write: N x (relatedEdges x indices(indexedEdge) + 
1(snapshotEdge))
+               */
+              mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), 
withWait = true).map(_.forall(identity))
+            } else {
+              deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, 
requestTs, MaxRetryNum)
+            }
+          case _ =>
+
+            /**
+             * read: x
+             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x 
indices)
+             */
+            deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, 
requestTs, MaxRetryNum)
+        }
+      }
+
+    if (futures.isEmpty) {
+      // all deleted.
+      Future.successful(true -> true)
+    } else {
+      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
+    }
+  }
+
+  protected def fetchAndDeleteAll(query: Query, requestTs: Long): 
Future[(Boolean, Boolean)] = {
+    val future = for {
+      queryRequestWithResultLs <- getEdges(query)
+      (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, 
requestTs)
+    } yield {
+//        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
+        (allDeleted, ret)
+      }
+
+    Extensions.retryOnFailure(MaxRetryNum) {
+      future
+    } {
+      logger.error(s"fetch and deleteAll failed.")
+      (true, false)
+    }
+
+  }
+
+  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
+                             labels: Seq[Label],
+                             dir: Int,
+                             ts: Long): Future[Boolean] = {
+
+    def enqueueLogMessage() = {
+      val kafkaMessages = for {
+        vertice <- srcVertices
+        id = vertice.innerId.toIdString()
+        label <- labels
+      } yield {
+          val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
+          val topic = ExceptionHandler.failTopic
+          val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
+          kafkaMsg
+        }
+
+      ExceptionHandler.enqueues(kafkaMessages)
+    }
+
+    val requestTs = ts
+    val queryParams = for {
+      label <- labels
+    } yield {
+        val labelWithDir = LabelWithDirection(label.id.get, dir)
+        QueryParam(labelWithDir).limit(0, 
DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
+      }
+
+    val step = Step(queryParams.toList)
+    val q = Query(srcVertices, Vector(step))
+
+    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, 
Random.nextInt(MaxBackOff) + 1) {
+    val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
+      fetchAndDeleteAll(q, requestTs)
+    } { case (allDeleted, deleteSuccess) =>
+      allDeleted && deleteSuccess
+    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
+
+    retryFuture onFailure {
+      case ex =>
+        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
+        enqueueLogMessage()
+    }
+
+    retryFuture
+  }
+
+  /** End Of Delete All */
+
+
+
+
+  /** Parsing Logic: parse from kv from Storage into Edge */
+  def toEdge[K: CanSKeyValue](kv: K,
+                              queryParam: QueryParam,
+                              cacheElementOpt: Option[IndexEdge],
+                              parentEdges: Seq[EdgeWithScore]): Option[Edge] = 
{
+//        logger.debug(s"toEdge: $kv")
+    try {
+      val schemaVer = queryParam.label.schemaVersion
+      val indexEdgeOpt = 
indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), 
queryParam.label.schemaVersion, cacheElementOpt)
+      indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = 
parentEdges))
+    } catch {
+      case ex: Exception =>
+        logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}", ex)
+        None
+    }
+  }
+
+  def toSnapshotEdge[K: CanSKeyValue](kv: K,
+                                      queryParam: QueryParam,
+                                      cacheElementOpt: Option[SnapshotEdge] = 
None,
+                                      isInnerCall: Boolean,
+                                      parentEdges: Seq[EdgeWithScore]): 
Option[Edge] = {
+//        logger.debug(s"SnapshottoEdge: $kv")
+    val schemaVer = queryParam.label.schemaVersion
+    val snapshotEdgeOpt = 
snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), 
queryParam.label.schemaVersion, cacheElementOpt)
+
+    if (isInnerCall) {
+      snapshotEdgeOpt.flatMap { snapshotEdge =>
+        val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+        if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
+        else None
+      }
+    } else {
+      snapshotEdgeOpt.flatMap { snapshotEdge =>
+        if (Edge.allPropsDeleted(snapshotEdge.props)) None
+        else {
+          val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+          if (queryParam.where.map(_.filter(edge)).getOrElse(true)) 
Option(edge)
+          else None
+        }
+      }
+    }
+  }
+
+  def toEdges[K: CanSKeyValue](kvs: Seq[K],
+                               queryParam: QueryParam,
+                               prevScore: Double = 1.0,
+                               isInnerCall: Boolean,
+                               parentEdges: Seq[EdgeWithScore]): 
Seq[EdgeWithScore] = {
+    if (kvs.isEmpty) Seq.empty
+    else {
+      val first = kvs.head
+      val kv = first
+      val schemaVer = queryParam.label.schemaVersion
+      val cacheElementOpt =
+        if (queryParam.isSnapshotEdge) None
+        else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, 
Seq(kv), queryParam.label.schemaVersion, None)
+
+      for {
+        kv <- kvs
+        edge <-
+        if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, 
isInnerCall, parentEdges)
+        else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
+      } yield {
+        //TODO: Refactor this.
+        val currentScore =
+          queryParam.scorePropagateOp match {
+            case "plus" => edge.rank(queryParam.rank) + prevScore
+            case _ => edge.rank(queryParam.rank) * prevScore
+          }
+        EdgeWithScore(edge, currentScore)
+      }
+    }
+  }
+
+  /** End Of Parse Logic */
+
+//  /** methods for consistency */
+//  protected def writeAsyncSimple(zkQuorum: String, elementRpcs: 
Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
+//    if (elementRpcs.isEmpty) {
+//      Future.successful(true)
+//    } else {
+//      val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) }
+//      Future.sequence(futures).map(_.forall(identity))
+//    }
+//  }
+
+  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends Exception
+
+  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
+    val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
+    logger.debug(msg)
+  }
+
+  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, 
edgeMutate: EdgeMutate) = {
+    val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
+      s"${edgeMutate.toLogString}").mkString("\n")
+    logger.debug(msg)
+  }
+
+  protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, 
kvOpt: Option[SKeyValue]) = {
+    val currentTs = System.currentTimeMillis()
+    val lockTs = snapshotEdgeOpt match {
+      case None => Option(currentTs)
+      case Some(snapshotEdge) =>
+        snapshotEdge.pendingEdgeOpt match {
+          case None => Option(currentTs)
+          case Some(pendingEdge) => pendingEdge.lockTs
+        }
+    }
+    val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1
+    //      snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
+    val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = 
lockTs)
+    val base = snapshotEdgeOpt match {
+      case None =>
+        // no one ever mutated on this snapshotEdge.
+        edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
+      case Some(snapshotEdge) =>
+        // there is at least one mutation have been succeed.
+        snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = 
Option(pendingEdge))
+    }
+    base.copy(version = newVersion, statusCode = 1, lockTs = None)
+  }
+
+  protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: 
SnapshotEdge,
+                                     edgeMutate: EdgeMutate) = {
+    val newVersion = lockEdge.version + 1
+    val base = edgeMutate.newSnapshotEdge match {
+      case None =>
+        // shouldReplace false
+        assert(snapshotEdgeOpt.isDefined)
+        snapshotEdgeOpt.get.toSnapshotEdge
+      case Some(newSnapshotEdge) => newSnapshotEdge
+    }
+    base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None)
+  }
+
+  protected def acquireLock(statusCode: Byte,
+                            edge: Edge,
+                            oldSnapshotEdgeOpt: Option[Edge],
+                            lockEdge: SnapshotEdge,
+                            oldBytes: Array[Byte]): Future[Boolean] = {
+    if (statusCode >= 1) {
+      logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p")
+      else {
+        val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
+        val oldPut = oldSnapshotEdgeOpt.map(e => 
snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
+//        val lockEdgePut = buildPutAsync(lockEdge).head
+//        val oldPut = oldSnapshotEdgeOpt.map(e => 
buildPutAsync(e.toSnapshotEdge).head)
+        writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
+          logger.error(s"AcquireLock RPC Failed.")
+          throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed")
+        }.map { ret =>
+          if (ret) {
+            val log = Seq(
+              "\n",
+              "=" * 50,
+              s"[Success]: acquireLock",
+              s"[RequestEdge]: ${edge.toLogString}",
+              s"[LockEdge]: ${lockEdge.toLogString()}",
+              s"[PendingEdge]: 
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
+              "=" * 50, "\n").mkString("\n")
+
+            logger.debug(log)
+            //            debug(ret, "acquireLock", edge.toSnapshotEdge)
+          } else {
+            throw new PartialFailureException(edge, 0, "hbase fail.")
+          }
+          true
+        }
+      }
+    }
+  }
+
+
+
+  protected def releaseLock(predicate: Boolean,
+                            edge: Edge,
+                            lockEdge: SnapshotEdge,
+                            releaseLockEdge: SnapshotEdge,
+                            _edgeMutate: EdgeMutate,
+                            oldBytes: Array[Byte]): Future[Boolean] = {
+    if (!predicate) {
+      throw new PartialFailureException(edge, 3, "predicate failed.")
+    }
+    val p = Random.nextDouble()
+    if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p")
+    else {
+      val releaseLockEdgePut = 
snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head
+      val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
+      writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith {
+        case ex: Exception =>
+          logger.error(s"ReleaseLock RPC Failed.")
+          throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed")
+      }.map { ret =>
+        if (ret) {
+          debug(ret, "releaseLock", edge.toSnapshotEdge)
+        } else {
+          val msg = Seq("\nFATAL ERROR\n",
+            "=" * 50,
+            oldBytes.toList,
+            lockEdgePut,
+            releaseLockEdgePut,
+            //            lockEdgePut.value.toList,
+            //            releaseLockEdgePut.value().toList,
+            "=" * 50,
+            "\n"
+          )
+          logger.error(msg.mkString("\n"))
+          //          error(ret, "releaseLock", edge.toSnapshotEdge)
+          throw new PartialFailureException(edge, 3, "hbase fail.")
+        }
+        true
+      }
+    }
+    Future.successful(true)
+  }
+
+
+  protected def mutate(predicate: Boolean,
+                       edge: Edge,
+                       statusCode: Byte,
+                       _edgeMutate: EdgeMutate): Future[Boolean] = {
+    if (!predicate) throw new PartialFailureException(edge, 1, "predicate 
failed.")
+
+    if (statusCode >= 2) {
+      logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p")
+      else
+        writeToStorage(edge.label.hbaseZkAddr, 
indexedEdgeMutations(_edgeMutate), withWait = true).map { ret =>
+          if (ret) {
+            debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate)
+          } else {
+            throw new PartialFailureException(edge, 1, "hbase fail.")
+          }
+          true
+        }
+    }
+  }
+
+  protected def increment(predicate: Boolean,
+                          edge: Edge,
+                          statusCode: Byte, _edgeMutate: EdgeMutate): 
Future[Boolean] = {
+    if (!predicate) throw new PartialFailureException(edge, 2, "predicate 
failed.")
+    if (statusCode >= 3) {
+      logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}")
+      Future.successful(true)
+    } else {
+      val p = Random.nextDouble()
+      if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p")
+      else
+        writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate), 
withWait = true).map { ret =>
+          if (ret) {
+            debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate)
+          } else {
+            throw new PartialFailureException(edge, 2, "hbase fail.")
+          }
+          true
+        }
+    }
+  }
+
+
+  /** this may be overrided by specific storage implementation */
+  protected def commitProcess(edge: Edge, statusCode: Byte)
+                             (snapshotEdgeOpt: Option[Edge], kvOpt: 
Option[SKeyValue])
+                             (lockEdge: SnapshotEdge, releaseLockEdge: 
SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = {
+    val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte])
+    for {
+      locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge, 
oldBytes)
+      mutated <- mutate(locked, edge, statusCode, _edgeMutate)
+      incremented <- increment(mutated, edge, statusCode, _edgeMutate)
+      released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, 
_edgeMutate, oldBytes)
+    } yield {
+      released
+    }
+  }
+
+  protected def commitUpdate(edge: Edge,
+                             statusCode: Byte)(snapshotEdgeOpt: Option[Edge],
+                                               kvOpt: Option[SKeyValue],
+                                               edgeUpdate: EdgeMutate): 
Future[Boolean] = {
+    val label = edge.label
+    def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty)
+
+    val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt)
+    val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, 
edgeUpdate)
+    val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_
+    snapshotEdgeOpt match {
+      case None =>
+        // no one ever did success on acquire lock.
+        _process(lockEdge, releaseLockEdge, edgeUpdate)
+      //        process(lockEdge, releaseLockEdge, edgeUpdate, statusCode)
+      case Some(snapshotEdge) =>
+        // someone did success on acquire lock at least one.
+        snapshotEdge.pendingEdgeOpt match {
+          case None =>
+            // not locked
+            _process(lockEdge, releaseLockEdge, edgeUpdate)
+          //            process(lockEdge, releaseLockEdge, edgeUpdate, 
statusCode)
+          case Some(pendingEdge) =>
+            def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < 
System.currentTimeMillis()
+            if (isLockExpired) {
+              val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
+              val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(pendingEdge))
+              val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, 
kvOpt)
+              val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
newLockEdge, newEdgeUpdate)
+              commitProcess(edge, statusCode = 0)(snapshotEdgeOpt, 
kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret =>
+                //              process(newLockEdge, newReleaseLockEdge, 
newEdgeUpdate, statusCode = 0).flatMap { ret =>
+                val log = s"[Success]: Resolving expired pending 
edge.\n${pendingEdge.toLogString}"
+                throw new PartialFailureException(edge, 0, log)
+              }
+            } else {
+              // locked
+              if (pendingEdge.ts == edge.ts && statusCode > 0) {
+                // self locked
+                val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
+                val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(edge))
+                val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
lockEdge, newEdgeUpdate)
+
+                /** lockEdge will be ignored */
+                _process(lockEdge, newReleaseLockEdge, newEdgeUpdate)
+                //                process(lockEdge, newReleaseLockEdge, 
newEdgeUpdate, statusCode)
+              } else {
+                throw new PartialFailureException(edge, statusCode, 
s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]")
+              }
+            }
+        }
+    }
+  }
+
+  /** end of methods for consistency */
+
+
+  //  def futureCache[T] = Cache[Long, (Long, T)]
+
+  protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
+    val srcVertex = queryRequest.vertex
+    //    val tgtVertexOpt = queryRequest.tgtVertexOpt
+    val edgeCf = Serializable.edgeCf
+    val queryParam = queryRequest.queryParam
+    val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
+    val label = queryParam.label
+    val labelWithDir = queryParam.labelWithDir
+    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
+    val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
+      case Some(tgtVertexId) => // _to is given.
+        /** we use toSnapshotEdge so dont need to swap src, tgt */
+        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
+        val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, 
label.schemaVersion)
+        (src, tgt)
+      case None =>
+        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
+        (src, src)
+    }
+
+    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
+    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
+    val currentTs = System.currentTimeMillis()
+    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs)).toMap
+    Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs)
+  }
+
+
+
+  protected def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, 
Option[Edge], Option[SKeyValue])] = {
+    val labelWithDir = edge.labelWithDir
+    val queryParam = QueryParam(labelWithDir)
+    val _queryParam = 
queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
+    val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
+    val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
+
+    fetchSnapshotEdgeKeyValues(buildRequest(queryRequest)).map { kvs =>
+      val (edgeOpt, kvOpt) =
+        if (kvs.isEmpty) (None, None)
+        else {
+          val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, 
parentEdges = Nil).headOption.map(_.edge)
+          val _kvOpt = kvs.headOption
+          (_edgeOpt, _kvOpt)
+        }
+      (queryParam, edgeOpt, kvOpt)
+    } recoverWith { case ex: Throwable =>
+      logger.error(s"fetchQueryParam failed. fallback return.", ex)
+      throw new FetchTimeoutException(s"${edge.toLogString}")
+    }
+  }
+
+  protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: 
Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
+    if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
+    else {
+      val queryRequest = queryRequestWithResultsLs.head.queryRequest
+      val q = orgQuery
+      val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
+
+      val stepIdx = queryRequest.stepIdx + 1
+
+      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
+      val prevStepThreshold = 
prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
+      val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
+      val step = q.steps(stepIdx)
+      val alreadyVisited =
+        if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
+        else Graph.alreadyVisitedVertices(queryResultsLs)
+
+      val groupedBy = queryResultsLs.flatMap { queryResult =>
+        queryResult.edgeWithScoreLs.map { case edgeWithScore =>
+          edgeWithScore.edge.tgtVertex -> edgeWithScore
+        }
+      }.groupBy { case (vertex, edgeWithScore) => vertex }
+
+      val groupedByFiltered = for {
+        (vertex, edgesWithScore) <- groupedBy
+        aggregatedScore = edgesWithScore.map(_._2.score).sum if 
aggregatedScore >= prevStepThreshold
+      } yield vertex -> aggregatedScore
+
+      val prevStepTgtVertexIdEdges = for {
+        (vertex, edgesWithScore) <- groupedBy
+      } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) 
=> edgeWithScore }
+
+      val nextStepSrcVertices = if (prevStepLimit >= 0) {
+        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
+      } else {
+        groupedByFiltered.toSeq
+      }
+
+      val queryRequests = for {
+        (vertex, prevStepScore) <- nextStepSrcVertices
+        queryParam <- step.queryParams
+      } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
+
+      Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), 
alreadyVisited)(ec)
+    }
+  }
+
+  protected def fetchStepFuture(orgQuery: Query, 
queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): 
Future[Seq[QueryRequestWithResult]] = {
+    for {
+      queryRequestWithResultLs <- queryRequestWithResultLsFuture
+      ret <- fetchStep(orgQuery, queryRequestWithResultLs)
+    } yield ret
+  }
+
+  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = {
+    val fallback = {
+      val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, 
queryParam = QueryParam.Empty)
+      Future.successful(q.vertices.map(v => 
QueryRequestWithResult(queryRequest, QueryResult())))
+    }
+    Try {
+
+      if (q.steps.isEmpty) {
+        // TODO: this should be get vertex query.
+        fallback
+      } else {
+        // current stepIdx = -1
+        val startQueryResultLs = QueryResult.fromVertices(q)
+        q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, 
step) =>
+            fetchStepFuture(q, acc)
+//          fetchStepFuture(q, acc).map { stepResults =>
+//            step.queryParams.zip(stepResults).foreach { case (qParam, 
queryRequestWithResult)  =>
+//              val cursor = 
Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor)
+//              qParam.cursorOpt = Option(cursor)
+//            }
+//            stepResults
+//          }
+        }
+      }
+    } recover {
+      case e: Exception =>
+        logger.error(s"getEdgesAsync: $e", e)
+        fallback
+    } get
+  }
+
+  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): 
Future[Seq[QueryRequestWithResult]] = {
+    val ts = System.currentTimeMillis()
+    val futures = for {
+      (srcVertex, tgtVertex, queryParam) <- params
+      propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, queryParam.label.schemaVersion))
+      edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
+    } yield {
+        fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
+          val _queryParam = 
queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
+          val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
+          val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
+          val queryResult = QueryResult(edgeOpt.toSeq.map(e => 
EdgeWithScore(e, 1.0)))
+          QueryRequestWithResult(queryRequest, queryResult)
+        }
+      }
+
+    Future.sequence(futures)
+  }
+
+
+
+  @tailrec
+  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = 
Set.empty[Int]): Set[Int] = {
+    if (range < sampleNumber || set.size == sampleNumber) set
+    else randomInt(sampleNumber, range, set + Random.nextInt(range))
+  }
+
+  protected def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], 
n: Int): Seq[EdgeWithScore] = {
+    if (edges.size <= n){
+      edges
+    }else{
+      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+        edges.tail
+      } else edges
+
+      val randoms = randomInt(n, plainEdges.size)
+      var samples = List.empty[EdgeWithScore]
+      var idx = 0
+      plainEdges.foreach { e =>
+        if (randoms.contains(idx)) samples = e :: samples
+        idx += 1
+      }
+      samples.toSeq
+    }
+
+  }
+  /** end of query */
+
+  /** Mutation Builder */
+
+
+  /** EdgeMutate */
+  def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
+    val deleteMutations = edgeMutate.edgesToDelete.flatMap { indexEdge =>
+      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete))
+    }
+    val insertMutations = edgeMutate.edgesToInsert.flatMap { indexEdge =>
+      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
+    }
+
+    deleteMutations ++ insertMutations
+  }
+
+  def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+    edgeMutate.newSnapshotEdge.map(e => 
snapshotEdgeSerializer(e).toKeyValues).getOrElse(Nil)
+
+  def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+    (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match 
{
+      case (true, true) =>
+
+        /** when there is no need to update. shouldUpdate == false */
+        List.empty
+      case (true, false) =>
+
+        /** no edges to delete but there is new edges to insert so increase 
degree by 1 */
+        edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
+      case (false, true) =>
+
+        /** no edges to insert but there is old edges to delete so decrease 
degree by 1 */
+        edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
+      case (false, false) =>
+
+        /** update on existing edges so no change on degree */
+        List.empty
+    }
+
+  /** IndexEdge */
+  def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[SKeyValue] = {
+    val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> 
InnerVal.withLong(amount, indexedEdge.schemaVer))
+    val _indexedEdge = indexedEdge.copy(props = newProps)
+    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment))
+  }
+
+  def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[SKeyValue] = {
+    val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> 
InnerVal.withLong(amount, indexedEdge.schemaVer))
+    val _indexedEdge = indexedEdge.copy(props = newProps)
+    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment))
+  }
+  def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = {
+    val kvs = vertexSerializer(vertex).toKeyValues
+    val kv = kvs.head
+    vertex.belongLabelIds.map { id =>
+      kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id)), operation = 
SKeyValue.Delete)
+    }
+  }
+
+  def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] =
+    if (edge.op == GraphUtil.operations("delete"))
+      buildDeleteBelongsToId(edge.srcForVertex) ++ 
buildDeleteBelongsToId(edge.tgtForVertex)
+    else
+      vertexSerializer(edge.srcForVertex).toKeyValues ++ 
vertexSerializer(edge.tgtForVertex).toKeyValues
+
+  def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
+    vertex.op match {
+      case d: Byte if d == GraphUtil.operations("delete") => 
vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete))
+      case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
new file mode 100644
index 0000000..74cd308
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
@@ -0,0 +1,95 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.QueryParam
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, 
InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
+
+object StorageDeserializable {
+  /** Deserializer */
+  def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): 
(Byte, Boolean) = {
+    val byte = bytes(offset)
+    val isInverted = if ((byte & 1) != 0) true else false
+    val labelOrderSeq = byte >> 1
+    (labelOrderSeq.toByte, isInverted)
+  }
+
+  def bytesToKeyValues(bytes: Array[Byte],
+                       offset: Int,
+                       length: Int,
+                       version: String): (Array[(Byte, InnerValLike)], Int) = {
+    var pos = offset
+    val len = bytes(pos)
+    pos += 1
+    val kvs = new Array[(Byte, InnerValLike)](len)
+    var i = 0
+    while (i < len) {
+      val k = bytes(pos)
+      pos += 1
+      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+      pos += numOfBytesUsed
+      kvs(i) = (k -> v)
+      i += 1
+    }
+    val ret = (kvs, pos)
+    //    logger.debug(s"bytesToProps: $ret")
+    ret
+  }
+
+  def bytesToKeyValuesWithTs(bytes: Array[Byte],
+                             offset: Int,
+                             version: String): (Array[(Byte, 
InnerValLikeWithTs)], Int) = {
+    var pos = offset
+    val len = bytes(pos)
+    pos += 1
+    val kvs = new Array[(Byte, InnerValLikeWithTs)](len)
+    var i = 0
+    while (i < len) {
+      val k = bytes(pos)
+      pos += 1
+      val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, 
version)
+      pos += numOfBytesUsed
+      kvs(i) = (k -> v)
+      i += 1
+    }
+    val ret = (kvs, pos)
+    //    logger.debug(s"bytesToProps: $ret")
+    ret
+  }
+
+  def bytesToProps(bytes: Array[Byte],
+                   offset: Int,
+                   version: String): (Array[(Byte, InnerValLike)], Int) = {
+    var pos = offset
+    val len = bytes(pos)
+    pos += 1
+    val kvs = new Array[(Byte, InnerValLike)](len)
+    var i = 0
+    while (i < len) {
+      val k = HBaseType.EMPTY_SEQ_BYTE
+      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+      pos += numOfBytesUsed
+      kvs(i) = (k -> v)
+      i += 1
+    }
+    //    logger.error(s"bytesToProps: $kvs")
+    val ret = (kvs, pos)
+
+    ret
+  }
+
+  def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, 
offset)
+}
+
+trait StorageDeserializable[E] {
+  def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], 
version: String, cacheElementOpt: Option[E]): Option[E] = {
+    try {
+      Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt))
+    } catch {
+      case e: Exception =>
+        logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
+        None
+    }
+  }
+  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], 
version: String, cacheElementOpt: Option[E]): E
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
new file mode 100644
index 0000000..46ce539
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
@@ -0,0 +1,41 @@
+package org.apache.s2graph.core.storage
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
+
+object StorageSerializable {
+  /** serializer */
+  def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+    val len = props.length
+    assert(len < Byte.MaxValue)
+    var bytes = Array.fill(1)(len.toByte)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes)
+    bytes
+  }
+
+  def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+    val len = props.length
+    assert(len < Byte.MaxValue)
+    var bytes = Array.fill(1)(len.toByte)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+    bytes
+  }
+
+  def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): 
Array[Byte] = {
+    val len = props.length
+    assert(len < Byte.MaxValue)
+    var bytes = Array.fill(1)(len.toByte)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+    bytes
+  }
+
+  def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): 
Array[Byte] = {
+    assert(labelOrderSeq < (1 << 6))
+    val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
+    Array.fill(1)(byte.toByte)
+  }
+}
+
+trait StorageSerializable[E] {
+  def toKeyValues: Seq[SKeyValue]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
new file mode 100644
index 0000000..2560c9d
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -0,0 +1,534 @@
+package org.apache.s2graph.core.storage.hbase
+
+import java.util
+import java.util.Base64
+
+import com.stumbleupon.async.Deferred
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.apache.s2graph.core.utils.{DeferCache, Extensions, FutureCache, 
logger}
+import org.hbase.async._
+
+import scala.collection.JavaConversions._
+import scala.collection.{Map, Seq}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future, duration}
+import scala.util.hashing.MurmurHash3
+
+
+object AsynchbaseStorage {
+  val vertexCf = Serializable.vertexCf
+  val edgeCf = Serializable.edgeCf
+  val emptyKVs = new util.ArrayList[KeyValue]()
+
+
+  def makeClient(config: Config, overrideKv: (String, String)*) = {
+    val asyncConfig: org.hbase.async.Config =
+      if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
+        val krb5Conf = config.getString("java.security.krb5.conf")
+        val jaas = config.getString("java.security.auth.login.config")
+
+        System.setProperty("java.security.krb5.conf", krb5Conf)
+        System.setProperty("java.security.auth.login.config", jaas)
+        new org.hbase.async.Config()
+      } else {
+        new org.hbase.async.Config()
+      }
+
+    for (entry <- config.entrySet() if entry.getKey.contains("hbase")) {
+      asyncConfig.overrideConfig(entry.getKey, 
entry.getValue.unwrapped().toString)
+    }
+
+    for ((k, v) <- overrideKv) {
+      asyncConfig.overrideConfig(k, v)
+    }
+
+    val client = new HBaseClient(asyncConfig)
+    logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}")
+    client
+  }
+}
+
+
+class AsynchbaseStorage(override val config: Config)(implicit ec: 
ExecutionContext)
+  extends Storage[Deferred[QueryRequestWithResult]](config) {
+
+  import Extensions.DeferOps
+
+  /**
+   * Asynchbase client setup.
+   * note that we need two client, one for bulk(withWait=false) and another 
for withWait=true
+   */
+  val configWithFlush = 
config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval"
 -> "0")))
+  val client = AsynchbaseStorage.makeClient(config)
+
+  private val clientWithFlush = AsynchbaseStorage.makeClient(config, 
"hbase.rpcs.buffered_flush_interval" -> "0")
+  private val clients = Seq(client, clientWithFlush)
+  private val clientFlushInterval = 
config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
+  private val emptyKeyValues = new util.ArrayList[KeyValue]()
+  private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client
+
+  /** Future Cache to squash request */
+  private val futureCache = new DeferCache[QueryResult](config)(ec)
+
+  /** Simple Vertex Cache */
+  private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec)
+
+
+  /**
+   * fire rpcs into proper hbase cluster using client and
+   * return true on all mutation success. otherwise return false.
+   */
+  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean): Future[Boolean] = {
+    if (kvs.isEmpty) Future.successful(true)
+    else {
+      val _client = client(withWait)
+      val futures = kvs.map { kv =>
+        val _defer = kv.operation match {
+          case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, 
kv.cf, kv.qualifier, kv.value, kv.timestamp))
+          case SKeyValue.Delete =>
+            if (kv.qualifier == null) _client.delete(new 
DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp))
+            else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, 
kv.qualifier, kv.timestamp))
+          case SKeyValue.Increment =>
+            _client.atomicIncrement(new AtomicIncrementRequest(kv.table, 
kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)))
+        }
+        val future = _defer.withCallback { ret => true }.recoverWith { ex =>
+          logger.error(s"mutation failed. $kv", ex)
+          false
+        }.toFuture
+
+        if (withWait) future else Future.successful(true)
+      }
+
+      Future.sequence(futures).map(_.forall(identity))
+    }
+  }
+
+
+  override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): 
Future[Seq[SKeyValue]] = {
+    val defer = fetchKeyValuesInner(hbaseRpc)
+    defer.toFuture.map { kvsArr =>
+      kvsArr.map { kv =>
+        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+      } toSeq
+    }
+  }
+
+  /**
+   * since HBase natively provide CheckAndSet on storage level, implementation 
becomes simple.
+   * @param rpc: key value that is need to be stored on storage.
+   * @param expectedOpt: last valid value for rpc's KeyValue.value from 
fetching.
+   * @return return true if expected value matches and our rpc is successfully 
applied, otherwise false.
+   *         note that when some other thread modified same cell and have 
different value on this KeyValue,
+   *         then HBase atomically return false.
+   */
+  override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): 
Future[Boolean] = {
+    val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, 
rpc.value, rpc.timestamp)
+    val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
+    client(withWait = true).compareAndSet(put, expected).withCallback(ret => 
ret.booleanValue()).toFuture
+  }
+
+
+  /**
+   * given queryRequest, build storage specific RPC Request.
+   * In HBase case, we either build Scanner or GetRequest.
+   *
+   * IndexEdge layer:
+   *    Tall schema(v4): use scanner.
+   *    Wide schema(label's schema version in v1, v2, v3): use GetRequest with 
columnRangeFilter
+   *                                                       when query is given 
with itnerval option.
+   * SnapshotEdge layer:
+   *    Tall schema(v3, v4): use GetRequest without column filter.
+   *    Wide schema(label's schema version in v1, v2): use GetRequest with 
columnRangeFilter.
+   * Vertex layer:
+   *    all version: use GetRequest without column filter.
+   * @param queryRequest
+   * @return Scanner or GetRequest with proper setup with StartKey, EndKey, 
RangeFilter.
+   */
+  override def buildRequest(queryRequest: QueryRequest): AnyRef = {
+    import Serializable._
+    val queryParam = queryRequest.queryParam
+    val label = queryParam.label
+    val edge = toRequestEdge(queryRequest)
+
+    val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+      val snapshotEdge = edge.toSnapshotEdge
+      snapshotEdgeSerializer(snapshotEdge).toKeyValues.head
+      //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
+    } else {
+      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == 
queryParam.labelOrderSeq)
+      assert(indexedEdgeOpt.isDefined)
+
+      val indexedEdge = indexedEdgeOpt.get
+      indexEdgeSerializer(indexedEdge).toKeyValues.head
+    }
+
+    val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
+
+    label.schemaVersion match {
+      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
+        val scanner = client.newScanner(label.hbaseTableName.getBytes)
+        scanner.setFamily(edgeCf)
+
+        /**
+         * TODO: remove this part.
+         */
+        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => 
edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+        val indexEdge = indexEdgeOpt.getOrElse(throw new 
RuntimeException(s"Can`t find index for query $queryParam"))
+
+        val srcIdBytes = 
VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+        val labelWithDirBytes = indexEdge.labelWithDir.bytes
+        val labelIndexSeqWithIsInvertedBytes = 
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = false)
+        //        val labelIndexSeqWithIsInvertedStopBytes =  
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = true)
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, 
Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op)))
+        val (startKey, stopKey) =
+          if (queryParam.columnRangeFilter != null) {
+            // interval is set.
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), 
Array.fill(1)(0))
+              case None => Bytes.add(baseKey, 
queryParam.columnRangeFilterMinBytes)
+            }
+            (_startKey, Bytes.add(baseKey, 
queryParam.columnRangeFilterMaxBytes))
+          } else {
+            /**
+             * note: since propsToBytes encode size of property map at first 
byte, we are sure about max value here
+             */
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), 
Array.fill(1)(0))
+              case None => baseKey
+            }
+            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+          }
+//                logger.debug(s"[StartKey]: ${startKey.toList}")
+//                logger.debug(s"[StopKey]: ${stopKey.toList}")
+
+        scanner.setStartKey(startKey)
+        scanner.setStopKey(stopKey)
+
+        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: 
$queryParam")
+
+        scanner.setMaxVersions(1)
+        scanner.setMaxNumRows(queryParam.limit)
+        scanner.setMaxTimestamp(maxTs)
+        scanner.setMinTimestamp(minTs)
+        scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
+        // SET option for this rpc properly.
+        scanner
+      case _ =>
+        val get =
+          if (queryParam.tgtVertexInnerIdOpt.isDefined) new 
GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
+          else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
+
+        get.maxVersions(1)
+        get.setFailfast(true)
+        get.setMaxResultsPerColumnFamily(queryParam.limit)
+        get.setRowOffsetPerColumnFamily(queryParam.offset)
+        get.setMinTimestamp(minTs)
+        get.setMaxTimestamp(maxTs)
+        get.setTimeout(queryParam.rpcTimeoutInMillis)
+
+        if (queryParam.columnRangeFilter != null) 
get.setFilter(queryParam.columnRangeFilter)
+
+        get
+    }
+  }
+
+  /**
+   * we are using future cache to squash requests into same key on storage.
+   *
+   * @param queryRequest
+   * @param prevStepScore
+   * @param isInnerCall
+   * @param parentEdges
+   * @return we use Deferred here since it has much better performrance 
compared to scala.concurrent.Future.
+   *         seems like map, flatMap on scala.concurrent.Future is slower than 
Deferred's addCallback
+   */
+  override def fetch(queryRequest: QueryRequest,
+                     prevStepScore: Double,
+                     isInnerCall: Boolean,
+                     parentEdges: Seq[EdgeWithScore]): 
Deferred[QueryRequestWithResult] = {
+
+    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
+      fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
+        val edgeWithScores = toEdges(kvs, queryRequest.queryParam, 
prevStepScore, isInnerCall, parentEdges)
+        val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
+          sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        } else edgeWithScores
+        QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
+//        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+
+      } recoverWith { ex =>
+        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+        QueryResult(isFailure = true)
+//        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+      }
+    }
+
+    val queryParam = queryRequest.queryParam
+    val cacheTTL = queryParam.cacheTTLInMillis
+    val request = buildRequest(queryRequest)
+
+    val defer =
+      if (cacheTTL <= 0) fetchInner(request)
+      else {
+        val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
+        val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+        futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+    }
+    defer withCallback { queryResult => QueryRequestWithResult(queryRequest, 
queryResult)}
+  }
+
+
+  override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, 
Double)],
+                       prevStepEdges: Predef.Map[VertexId, 
scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
+    val defers: Seq[Deferred[QueryRequestWithResult]] = for {
+      (queryRequest, prevStepScore) <- queryRequestWithScoreLs
+      parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
+    } yield fetch(queryRequest, prevStepScore, isInnerCall = false, 
parentEdges)
+
+    val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = 
Deferred.group(defers)
+    grouped withCallback {
+      queryResults: util.ArrayList[QueryRequestWithResult] =>
+        queryResults.toIndexedSeq
+    } toFuture
+  }
+
+
+  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = 
fetchSnapshotEdgeKeyValues(request)
+
+
+  /**
+   * when withWait is given, we use client with flushInterval set to 0.
+   * if we are not using this, then we are adding extra wait time as much as 
flushInterval in worst case.
+   *
+   * @param edges
+   * @param withWait
+   * @return
+   */
+  override def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = {
+    val _client = client(withWait)
+    val defers: Seq[Deferred[(Boolean, Long)]] = for {
+      edge <- edges
+    } yield {
+        val edgeWithIndex = edge.edgesWithIndex.head
+        val countWithTs = edge.propsWithTs(LabelMeta.countSeq)
+        val countVal = countWithTs.innerVal.toString().toLong
+        val incr = buildIncrementsCountAsync(edgeWithIndex, countVal).head
+        val request = incr.asInstanceOf[AtomicIncrementRequest]
+        _client.bufferAtomicIncrement(request) withCallback { resultCount: 
java.lang.Long =>
+          (true, resultCount.longValue())
+        } recoverWith { ex =>
+          logger.error(s"mutation failed. $request", ex)
+          (false, -1L)
+        }
+      }
+
+    val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = 
Deferred.groupInOrder(defers)
+    grouped.toFuture.map(_.toSeq)
+  }
+
+
+  override def flush(): Unit = clients.foreach { client =>
+    val timeout = Duration((clientFlushInterval + 10) * 20, 
duration.MILLISECONDS)
+    Await.result(client.flush().toFuture, timeout)
+  }
+
+
+  override def createTable(zkAddr: String,
+                           tableName: String,
+                           cfs: List[String],
+                           regionMultiplier: Int,
+                           ttl: Option[Int],
+                           compressionAlgorithm: String): Unit = {
+    logger.info(s"create table: $tableName on $zkAddr, $cfs, 
$regionMultiplier, $compressionAlgorithm")
+    val admin = getAdmin(zkAddr)
+    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+    if (!admin.tableExists(TableName.valueOf(tableName))) {
+      try {
+        val desc = new HTableDescriptor(TableName.valueOf(tableName))
+        desc.setDurability(Durability.ASYNC_WAL)
+        for (cf <- cfs) {
+          val columnDesc = new HColumnDescriptor(cf)
+            
.setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+            .setBloomFilterType(BloomType.ROW)
+            .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+            .setMaxVersions(1)
+            .setTimeToLive(2147483647)
+            .setMinVersions(0)
+            .setBlocksize(32768)
+            .setBlockCacheEnabled(true)
+          if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+          desc.addFamily(columnDesc)
+        }
+
+        if (regionCount <= 1) admin.createTable(desc)
+        else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
+      } catch {
+        case e: Throwable =>
+          logger.error(s"$zkAddr, $tableName failed with $e", e)
+          throw e
+      }
+    } else {
+      logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+    }
+  }
+
+
+  /** Asynchbase implementation override default getVertices to use future 
Cache */
+  override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+    def fromResult(queryParam: QueryParam,
+                   kvs: Seq[SKeyValue],
+                   version: String): Option[Vertex] = {
+
+      if (kvs.isEmpty) None
+      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
+    }
+
+    val futures = vertices.map { vertex =>
+      val kvs = vertexSerializer(vertex).toKeyValues
+      val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, 
Serializable.vertexCf)
+      //      get.setTimeout(this.singleGetTimeout.toShort)
+      get.setFailfast(true)
+      get.maxVersions(1)
+
+      val cacheKey = MurmurHash3.stringHash(get.toString)
+      vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 
10000)(fetchVertexKeyValues(get)).map { kvs =>
+        fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion)
+      }
+    }
+
+    Future.sequence(futures).map { result => result.toList.flatten }
+  }
+
+
+
+
+
+  /**
+   * Private Methods which is specific to Asynchbase implementation.
+   */
+  private def fetchKeyValuesInner(rpc: AnyRef): 
Deferred[util.ArrayList[KeyValue]] = {
+    rpc match {
+      case getRequest: GetRequest => client.get(getRequest)
+      case scanner: Scanner =>
+        scanner.nextRows().withCallback { kvsLs =>
+          val ls = new util.ArrayList[KeyValue]
+          if (kvsLs == null) {
+
+          } else {
+            kvsLs.foreach { kvs =>
+              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
+              else {
+
+              }
+            }
+          }
+          scanner.close()
+          ls
+        }.recoverWith { ex =>
+          logger.error(s"fetchKeyValuesInner failed.", ex)
+          scanner.close()
+          emptyKeyValues
+        }
+      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues 
failed. $rpc"))
+    }
+  }
+
+  private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = {
+    hbaseRpc match {
+      case getRequest: GetRequest => getRequest.key()
+      case scanner: Scanner => scanner.getCurrentKey()
+      case _ =>
+        logger.error(s"toCacheKeyBytes failed. not supported class type. 
$hbaseRpc")
+        Array.empty[Byte]
+    }
+  }
+
+  private def getSecureClusterAdmin(zkAddr: String) = {
+    val jaas = config.getString("java.security.auth.login.config")
+    val krb5Conf = config.getString("java.security.krb5.conf")
+    val realm = config.getString("realm")
+    val principal = config.getString("principal")
+    val keytab = config.getString("keytab")
+
+
+
+    System.setProperty("java.security.auth.login.config", jaas)
+    System.setProperty("java.security.krb5.conf", krb5Conf)
+    // System.setProperty("sun.security.krb5.debug", "true")
+    // System.setProperty("sun.security.spnego.debug", "true")
+    val conf = new Configuration(true)
+    val hConf = HBaseConfiguration.create(conf)
+
+    hConf.set("hbase.zookeeper.quorum", zkAddr)
+
+    hConf.set("hadoop.security.authentication", "Kerberos")
+    hConf.set("hbase.security.authentication", "Kerberos")
+    hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm)
+    hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm)
+
+    System.out.println("Connecting secure cluster, using keytab\n")
+    UserGroupInformation.setConfiguration(hConf)
+    UserGroupInformation.loginUserFromKeytab(principal, keytab)
+    val currentUser = UserGroupInformation.getCurrentUser()
+    System.out.println("current user : " + currentUser + "\n")
+
+    // get table list
+    val conn = ConnectionFactory.createConnection(hConf)
+    conn.getAdmin
+  }
+
+  /**
+   * following configuration need to come together to use secured hbase 
cluster.
+   * 1. set hbase.security.auth.enable = true
+   * 2. set file path to jaas file java.security.auth.login.config
+   * 3. set file path to kerberos file java.security.krb5.conf
+   * 4. set realm
+   * 5. set principal
+   * 6. set file path to keytab
+   * @param zkAddr
+   * @return
+   */
+  private def getAdmin(zkAddr: String) = {
+    if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
+      getSecureClusterAdmin(zkAddr)
+    } else {
+      val conf = HBaseConfiguration.create()
+      conf.set("hbase.zookeeper.quorum", zkAddr)
+      val conn = ConnectionFactory.createConnection(conf)
+      conn.getAdmin
+    }
+  }
+
+  private def enableTable(zkAddr: String, tableName: String) = {
+    getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
+  }
+
+  private def disableTable(zkAddr: String, tableName: String) = {
+    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+  }
+
+  private def dropTable(zkAddr: String, tableName: String) = {
+    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+    getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
+  }
+
+  private def getStartKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount))
+  }
+
+  private def getEndKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..143f02d
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -0,0 +1,132 @@
+package org.apache.s2graph.core.storage.serde.indexedge.tall
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.storage.StorageDeserializable._
+import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
SKeyValue, StorageDeserializable}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[IndexEdge] {
+   import StorageDeserializable._
+
+   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, 
Int)
+   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+   private def parseDegreeQualifier(kv: SKeyValue, version: String): 
QualifierRaw = {
+     //    val degree = Bytes.toLong(kv.value)
+     val degree = bytesToLongFunc(kv.value, 0)
+     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, 
version))
+     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
+     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+   }
+
+   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+     var qualifierLen = 0
+     var pos = 0
+     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+       pos = endAt
+       qualifierLen += endAt
+       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
version)
+       }
+       qualifierLen += tgtVertexIdLen
+       (props, endAt, tgtVertexId, tgtVertexIdLen)
+     }
+     val (op, opLen) =
+       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+       else (kv.qualifier(qualifierLen), 1)
+
+     qualifierLen += opLen
+
+     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+   }
+
+   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
version)
+     (props, endAt)
+   }
+
+   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+     (Array.empty[(Byte, InnerValLike)], 0)
+   }
+
+   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                    _kvs: Seq[T],
+                                                    version: String,
+                                                    cacheElementOpt: 
Option[IndexEdge]): IndexEdge = {
+
+     assert(_kvs.size == 1)
+
+     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+     val kv = kvs.head
+
+     //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, 
${kv.value.toList}")
+     var pos = 0
+     val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
+     pos += srcIdLen
+     val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+     pos += 4
+     val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+     pos += 1
+
+     val op = kv.row(pos)
+     pos += 1
+
+     if (pos == kv.row.length) {
+       // degree
+       //      val degreeVal = Bytes.toLong(kv.value)
+       val degreeVal = bytesToLongFunc(kv.value, 0)
+       val ts = kv.timestamp
+       val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, 
version),
+         LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version))
+       val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
+       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), 
labelWithDir, op, ts, labelIdxSeq, props)
+     } else {
+       // not degree edge
+       val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw 
new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, 
${labelIdxSeq}"))
+
+       val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version)
+       pos = endAt
+       val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version)
+       }
+
+       val idxProps = for {
+         (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+       } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v
+
+       val idxPropsMap = idxProps.toMap
+
+       val tgtVertexId =
+         idxPropsMap.get(LabelMeta.toSeq) match {
+           case None => tgtVertexIdRaw
+           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+         }
+
+       val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+         //        val countVal = Bytes.toLong(kv.value)
+         val countVal = bytesToLongFunc(kv.value, 0)
+         val dummyProps = Array(LabelMeta.countSeq -> 
InnerVal.withLong(countVal, version))
+         (dummyProps, 8)
+       } else {
+         bytesToKeyValues(kv.value, 0, kv.value.length, version)
+       }
+
+       val _mergedProps = (idxProps ++ props).toMap
+       val mergedProps =
+         if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+         else _mergedProps + (LabelMeta.timeStampSeq -> 
InnerVal.withLong(kv.timestamp, version))
+
+       val ts = kv.timestamp
+       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), 
labelWithDir, op, ts, labelIdxSeq, mergedProps)
+
+     }
+   }
+ }

Reply via email to