http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
deleted file mode 100644
index 8789502..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ /dev/null
@@ -1,1224 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.ExceptionHandler.{Key, Val, KafkaMessage}
-import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.storage.serde._
-import com.kakao.s2graph.core.storage.serde.snapshotedge.tall
-import 
com.kakao.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable
-import com.kakao.s2graph.core.storage.serde.vertex._
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.{Extensions, logger}
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.kafka.clients.producer.ProducerRecord
-import scala.annotation.tailrec
-import scala.collection.Seq
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Random, Try}
-
-abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
-  import HBaseType._
-
-  /** storage dependent configurations */
-  val MaxRetryNum = config.getInt("max.retry.number")
-  val MaxBackOff = config.getInt("max.back.off")
-  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
-  val FailProb = config.getDouble("hbase.fail.prob")
-  val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000)
-  val maxSize = config.getInt("future.cache.max.size")
-  val expireAfterWrite = config.getInt("future.cache.expire.after.write")
-  val expireAfterAccess = config.getInt("future.cache.expire.after.access")
-
-  /**
-   * Compatibility table
-   * | label schema version | snapshot edge | index edge | vertex | note |
-   * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do 
not use this. this exist only for backward compatibility issue |
-   * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do 
not use this. this exist only for backward compatibility issue |
-   * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | 
recommended with HBase. current stable schema |
-   * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | 
experimental schema. use scanner instead of get |
-   *
-   */
-
-  /**
-   * create serializer that knows how to convert given snapshotEdge into kvs: 
Seq[SKeyValue]
-   * so we can store this kvs.
-   * @param snapshotEdge: snapshotEdge to serialize
-   * @return serializer implementation for StorageSerializable which has 
toKeyValues return Seq[SKeyValue]
-   */
-  def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): 
Serializable[SnapshotEdge] = {
-    snapshotEdge.schemaVer match {
-      case VERSION1 | VERSION2 => new 
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
-      case VERSION3 | VERSION4 => new 
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
-      case _ => throw new RuntimeException(s"not supported version: 
${snapshotEdge.schemaVer}")
-    }
-  }
-
-  /**
-   * create serializer that knows how to convert given indexEdge into kvs: 
Seq[SKeyValue]
-   * @param indexEdge: indexEdge to serialize
-   * @return serializer implementation
-   */
-  def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = {
-    indexEdge.schemaVer match {
-      case VERSION1 | VERSION2 | VERSION3 => new 
indexedge.wide.IndexEdgeSerializable(indexEdge)
-      case VERSION4 => new indexedge.tall.IndexEdgeSerializable(indexEdge)
-      case _ => throw new RuntimeException(s"not supported version: 
${indexEdge.schemaVer}")
-
-    }
-  }
-
-  /**
-   * create serializer that knows how to convert given vertex into kvs: 
Seq[SKeyValue]
-   * @param vertex: vertex to serialize
-   * @return serializer implementation
-   */
-  def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex)
-
-  /**
-   * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
-   * note that each storage implementation should implement implicit type class
-   * to convert storage dependent dataType into common SKeyValue type by 
implementing CanSKeyValue
-   *
-   * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has 
implicit type conversion method.
-   * if any storaage use different class to represent stored byte array,
-   * then that storage implementation is responsible to provide implicit type 
conversion method on CanSKeyValue.
-   * */
-
-  val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = 
Map(
-    VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
-    VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable,
-    VERSION3 -> new tall.SnapshotEdgeDeserializable,
-    VERSION4 -> new tall.SnapshotEdgeDeserializable
-  )
-  def snapshotEdgeDeserializer(schemaVer: String) =
-    snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
-
-  /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
-  val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map(
-    VERSION1 -> new indexedge.wide.IndexEdgeDeserializable,
-    VERSION2 -> new indexedge.wide.IndexEdgeDeserializable,
-    VERSION3 -> new indexedge.wide.IndexEdgeDeserializable,
-    VERSION4 -> new indexedge.tall.IndexEdgeDeserializable
-  )
-
-  def indexEdgeDeserializer(schemaVer: String) =
-    indexEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
-
-  /** create deserializer that can parser stored CanSKeyValue into vertex. */
-  val vertexDeserializer = new VertexDeserializable
-
-
-  /**
-   * decide how to store given key values Seq[SKeyValue] into storage using 
storage's client.
-   * note that this should be return true on all success.
-   * we assumes that each storage implementation has client as member variable.
-   *
-   *
-   * @param cluster: where this key values should be stored.
-   * @param kvs: sequence of SKeyValue that need to be stored in storage.
-   * @param withWait: flag to control wait ack from storage.
-   *                  note that in AsynchbaseStorage(which support 
asynchronous operations), even with true,
-   *                  it never block thread, but rather submit work and 
notified by event loop when storage send ack back.
-   * @return ack message from storage.
-   */
-  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): 
Future[Boolean]
-
-//  def writeToStorage(kv: SKeyValue, withWait: Boolean): Future[Boolean]
-
-  /**
-   * fetch SnapshotEdge for given request from storage.
-   * also storage datatype should be converted into SKeyValue.
-   * note that return type is Sequence rather than single SKeyValue for 
simplicity,
-   * even though there is assertions sequence.length == 1.
-   * @param request
-   * @return
-   */
-  def fetchSnapshotEdgeKeyValues(request: AnyRef): Future[Seq[SKeyValue]]
-
-  /**
-   * write requestKeyValue into storage if the current value in storage that 
is stored matches.
-   * note that we only use SnapshotEdge as place for lock, so this method only 
change SnapshotEdge.
-   *
-   * Most important thing is this have to be 'atomic' operation.
-   * When this operation is mutating requestKeyValue's snapshotEdge, then 
other thread need to be
-   * either blocked or failed on write-write conflict case.
-   *
-   * Also while this method is still running, then fetchSnapshotEdgeKeyValues 
should be synchronized to
-   * prevent wrong data for read.
-   *
-   * Best is use storage's concurrency control(either pessimistic or 
optimistic) such as transaction,
-   * compareAndSet to synchronize.
-   *
-   * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation 
to guarantee 'atomicity'.
-   * for storage that does not support concurrency control, then storage 
implementation
-   * itself can maintain manual locks that synchronize 
read(fetchSnapshotEdgeKeyValues)
-   * and write(writeLock).
-   * @param requestKeyValue
-   * @param expectedOpt
-   * @return
-   */
-  def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue]): 
Future[Boolean]
-
-  /**
-   * build proper request which is specific into storage to call 
fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
-   * for example, Asynchbase use GetRequest, Scanner so this method is 
responsible to build
-   * client request(GetRequest, Scanner) based on user provided query.
-   * @param queryRequest
-   * @return
-   */
-  def buildRequest(queryRequest: QueryRequest): AnyRef
-
-  /**
-   * fetch IndexEdges for given queryParam in queryRequest.
-   * this expect previous step starting score to propagate score into next 
step.
-   * also parentEdges is necessary to return full bfs tree when query require 
it.
-   *
-   * note that return type is general type.
-   * for example, currently we wanted to use Asynchbase
-   * so single I/O return type should be Deferred[T].
-   *
-   * if we use native hbase client, then this return type can be Future[T] or 
just T.
-   * @param queryRequest
-   * @param prevStepScore
-   * @param isInnerCall
-   * @param parentEdges
-   * @return
-   */
-  def fetch(queryRequest: QueryRequest,
-            prevStepScore: Double,
-            isInnerCall: Boolean,
-            parentEdges: Seq[EdgeWithScore]): R
-
-  /**
-   * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
-   * @param queryRequestWithScoreLs
-   * @param prevStepEdges
-   * @return
-   */
-  def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[QueryRequestWithResult]]
-
-  /**
-   * fetch Vertex for given request from storage.
-   * @param request
-   * @return
-   */
-  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]]
-
-  /**
-   * decide how to apply given edges(indexProps values + Map(_count -> 
countVal)) into storage.
-   * @param edges
-   * @param withWait
-   * @return
-   */
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]]
-
-  /**
-   * this method need to be called when client shutdown. this is responsible 
to cleanUp the resources
-   * such as client into storage.
-   */
-  def flush(): Unit
-
-  /**
-   * create table on storage.
-   * if storage implementation does not support namespace or table, then there 
is nothing to be done
-   * @param zkAddr
-   * @param tableName
-   * @param cfs
-   * @param regionMultiplier
-   * @param ttl
-   * @param compressionAlgorithm
-   */
-  def createTable(zkAddr: String,
-                  tableName: String,
-                  cfs: List[String],
-                  regionMultiplier: Int,
-                  ttl: Option[Int],
-                  compressionAlgorithm: String): Unit
-
-
-
-
-
-  /** Public Interface */
-
-  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
-    def fromResult(queryParam: QueryParam,
-                   kvs: Seq[SKeyValue],
-                   version: String): Option[Vertex] = {
-      if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
-    }
-
-    val futures = vertices.map { vertex =>
-      val queryParam = QueryParam.Empty
-      val q = Query.toQuery(Seq(vertex), queryParam)
-      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-      fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs =>
-        fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion)
-      } recoverWith { case ex: Throwable =>
-        Future.successful(None)
-      }
-    }
-
-    Future.sequence(futures).map { result => result.toList.flatten }
-  }
-
-  def mutateElements(elements: Seq[GraphElement],
-                     withWait: Boolean = false): Future[Seq[Boolean]] = {
-
-    val edgeBuffer = ArrayBuffer[Edge]()
-    val vertexBuffer = ArrayBuffer[Vertex]()
-
-    elements.foreach {
-      case e: Edge => edgeBuffer += e
-      case v: Vertex => vertexBuffer += v
-      case any@_ => logger.error(s"Unknown type: ${any}")
-    }
-
-    val edgeFuture = mutateEdges(edgeBuffer, withWait)
-    val vertexFuture = mutateVertices(vertexBuffer, withWait)
-
-    val graphFuture = for {
-      edgesMutated <- edgeFuture
-      verticesMutated <- vertexFuture
-    } yield edgesMutated ++ verticesMutated
-
-    graphFuture
-  }
-
-  def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = 
{
-    val (strongEdges, weakEdges) =
-      (edges.partition(e => e.label.consistencyLevel == "strong" || e.op == 
GraphUtil.operations("insertBulk")))
-
-    val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map 
{ case (zkQuorum, edges) =>
-      val mutations = edges.flatMap { edge =>
-        val (_, edgeUpdate) =
-          if (edge.op == GraphUtil.operations("delete")) 
Edge.buildDeleteBulk(None, edge)
-          else Edge.buildOperation(None, Seq(edge))
-        buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++
-          snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
-      }
-      writeToStorage(zkQuorum, mutations, withWait)
-    }
-    val strongEdgesFutures = mutateStrongEdges(strongEdges, withWait)
-    for {
-      weak <- Future.sequence(weakEdgesFutures)
-      strong <- strongEdgesFutures
-    } yield {
-      strong ++ weak
-    }
-  }
-  def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
-
-    val grouped = _edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
-
-    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
-      val (deleteAllEdges, edges) = edgeGroup.partition(_.op == 
GraphUtil.operations("deleteAll"))
-
-      // DeleteAll first
-      val deleteAllFutures = deleteAllEdges.map { edge =>
-        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
-      }
-
-      // After deleteAll, process others
-      lazy val mutateEdgeFutures = edges.toList match {
-        case head :: tail =>
-          //          val strongConsistency = 
edges.head.label.consistencyLevel == "strong"
-          //          if (strongConsistency) {
-          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , 
withWait)(Edge.buildOperation)
-
-          //TODO: decide what we will do on failure on vertex put
-          val puts = buildVertexPutsAsync(head)
-          val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, 
withWait)
-          Seq(edgeFuture, vertexFuture)
-        //          } else {
-        //            edges.map { edge => mutateEdge(edge, withWait = 
withWait) }
-        //          }
-        case Nil => Nil
-      }
-
-      val composed = for {
-        deleteRet <- Future.sequence(deleteAllFutures)
-        mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield deleteRet ++ mutateRet
-
-      composed.map(_.forall(identity))
-    }
-
-    Future.sequence(mutateEdges)
-  }
-
-  def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = {
-    if (vertex.op == GraphUtil.operations("delete")) {
-      writeToStorage(vertex.hbaseZkAddr,
-        vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)), withWait)
-    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
-      logger.info(s"deleteAll for vertex is truncated. $vertex")
-      Future.successful(true) // Ignore withWait parameter, because deleteAll 
operation may takes long time
-    } else {
-      writeToStorage(vertex.hbaseZkAddr, buildPutsAll(vertex), withWait)
-    }
-  }
-
-  def mutateVertices(vertices: Seq[Vertex],
-                     withWait: Boolean = false): Future[Seq[Boolean]] = {
-    val futures = vertices.map { vertex => mutateVertex(vertex, withWait) }
-    Future.sequence(futures)
-  }
-
-
-  def mutateEdgesInner(edges: Seq[Edge],
-                       checkConsistency: Boolean,
-                       withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => 
(Edge, EdgeMutate)): Future[Boolean] = {
-    if (!checkConsistency) {
-      val zkQuorum = edges.head.label.hbaseZkAddr
-      val futures = edges.map { edge =>
-        val (_, edgeUpdate) = f(None, Seq(edge))
-        val mutations =
-          indexedEdgeMutations(edgeUpdate) ++
-            snapshotEdgeMutations(edgeUpdate) ++
-            increments(edgeUpdate)
-        writeToStorage(zkQuorum, mutations, withWait)
-      }
-      Future.sequence(futures).map { rets => rets.forall(identity) }
-    } else {
-      def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = {
-
-        fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
-
-          val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges)
-          logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}")
-          //shouldReplace false.
-          if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) {
-            logger.debug(s"${newEdge.toLogString} drop.")
-            Future.successful(true)
-          } else {
-            commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, 
edgeUpdate).map { ret =>
-              if (ret) {
-                logger.info(s"[Success] commit: 
\n${_edges.map(_.toLogString).mkString("\n")}")
-              } else {
-                throw new PartialFailureException(newEdge, 3, "commit failed.")
-              }
-              true
-            }
-          }
-        }
-      }
-      def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: 
(Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = {
-        if (tryNum >= MaxRetryNum) {
-          edges.foreach { edge =>
-            logger.error(s"commit failed after 
$MaxRetryNum\n${edge.toLogString}")
-            ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = 
edge))
-          }
-          Future.successful(false)
-        } else {
-          val future = fn(edges, statusCode)
-          future.onSuccess {
-            case success =>
-              logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n")
-          }
-          future recoverWith {
-            case FetchTimeoutException(retryEdge) =>
-              logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
-              retry(tryNum + 1)(edges, statusCode)(fn)
-
-            case PartialFailureException(retryEdge, failedStatusCode, 
faileReason) =>
-              val status = failedStatusCode match {
-                case 0 => "AcquireLock failed."
-                case 1 => "Mutation failed."
-                case 2 => "Increment failed."
-                case 3 => "ReleaseLock failed."
-                case 4 => "Unknown"
-              }
-
-              Thread.sleep(Random.nextInt(MaxBackOff))
-              logger.info(s"[Try: $tryNum], [Status: $status] partial 
fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}")
-              retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn)
-            case ex: Exception =>
-              logger.error("Unknown exception", ex)
-              Future.successful(false)
-          }
-        }
-      }
-      retry(1)(edges, 0)(commit)
-    }
-  }
-
-  def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
-                newEdge: Edge, edgeMutate: EdgeMutate) =
-    Seq("----------------------------------------------",
-      s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
-      s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
-      s"newEdge: ${newEdge.toLogString}",
-      s"mutation: \n${edgeMutate.toLogString}",
-      "----------------------------------------------").mkString("\n")
-
-
-  /** Delete All */
-  protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest,
-                                              queryResult: QueryResult,
-                                              requestTs: Long,
-                                              retryNum: Int): Future[Boolean] 
= {
-    val queryParam = queryRequest.queryParam
-    val zkQuorum = queryParam.label.hbaseZkAddr
-    val futures = for {
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield {
-        /** reverted direction */
-        val reversedIndexedEdgesMutations = 
edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
-          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
-            buildIncrementsAsync(indexEdge, -1L)
-        }
-        val reversedSnapshotEdgeMutations = 
snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
-        val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { 
indexEdge =>
-          indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
-            buildIncrementsAsync(indexEdge, -1L)
-        }
-        val mutations = reversedIndexedEdgesMutations ++ 
reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-        writeToStorage(zkQuorum, mutations, withWait = true)
-      }
-
-    Future.sequence(futures).map { rets => rets.forall(identity) }
-  }
-
-  protected def buildEdgesToDelete(queryRequestWithResultLs: 
QueryRequestWithResult, requestTs: Long): QueryResult = {
-    val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResultLs).get
-    val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore =>
-      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }.map { edgeWithScore =>
-      val label = queryRequest.queryParam.label
-      val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-        case "strong" =>
-          val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
-            Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
-          (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-        case _ =>
-          val oldEdge = edgeWithScore.edge
-          (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
-      }
-
-      val copiedEdge =
-        edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs 
= newPropsWithTs)
-
-      val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-//      logger.debug(s"delete edge from deleteAll: 
${edgeToDelete.edge.toLogString}")
-      edgeToDelete
-    }
-
-    queryResult.copy(edgeWithScoreLs = edgeWithScoreLs)
-  }
-
-  protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = {
-    val queryResultLs = queryRequestWithResultLs.map(_.queryResult)
-    queryResultLs.foreach { queryResult =>
-      if (queryResult.isFailure) throw new RuntimeException("fetched result is 
fallback.")
-    }
-    val futures = for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs)
-      if deleteQueryResult.edgeWithScoreLs.nonEmpty
-    } yield {
-        val label = queryRequest.queryParam.label
-        label.schemaVersion match {
-          case HBaseType.VERSION3 | HBaseType.VERSION4 =>
-            if (label.consistencyLevel == "strong") {
-              /**
-               * read: snapshotEdge on queryResult = O(N)
-               * write: N x (relatedEdges x indices(indexedEdge) + 
1(snapshotEdge))
-               */
-              mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), 
withWait = true).map(_.forall(identity))
-            } else {
-              deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, 
requestTs, MaxRetryNum)
-            }
-          case _ =>
-
-            /**
-             * read: x
-             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x 
indices)
-             */
-            deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, 
requestTs, MaxRetryNum)
-        }
-      }
-
-    if (futures.isEmpty) {
-      // all deleted.
-      Future.successful(true -> true)
-    } else {
-      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
-    }
-  }
-
-  protected def fetchAndDeleteAll(query: Query, requestTs: Long): 
Future[(Boolean, Boolean)] = {
-    val future = for {
-      queryRequestWithResultLs <- getEdges(query)
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, 
requestTs)
-    } yield {
-//        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
-        (allDeleted, ret)
-      }
-
-    Extensions.retryOnFailure(MaxRetryNum) {
-      future
-    } {
-      logger.error(s"fetch and deleteAll failed.")
-      (true, false)
-    }
-
-  }
-
-  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
-                             labels: Seq[Label],
-                             dir: Int,
-                             ts: Long): Future[Boolean] = {
-
-    def enqueueLogMessage() = {
-      val kafkaMessages = for {
-        vertice <- srcVertices
-        id = vertice.innerId.toIdString()
-        label <- labels
-      } yield {
-          val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
-          val topic = ExceptionHandler.failTopic
-          val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
-          kafkaMsg
-        }
-
-      ExceptionHandler.enqueues(kafkaMessages)
-    }
-
-    val requestTs = ts
-    val queryParams = for {
-      label <- labels
-    } yield {
-        val labelWithDir = LabelWithDirection(label.id.get, dir)
-        QueryParam(labelWithDir).limit(0, 
DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
-      }
-
-    val step = Step(queryParams.toList)
-    val q = Query(srcVertices, Vector(step))
-
-    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, 
Random.nextInt(MaxBackOff) + 1) {
-    val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) {
-      fetchAndDeleteAll(q, requestTs)
-    } { case (allDeleted, deleteSuccess) =>
-      allDeleted && deleteSuccess
-    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
-
-    retryFuture onFailure {
-      case ex =>
-        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
-        enqueueLogMessage()
-    }
-
-    retryFuture
-  }
-
-  /** End Of Delete All */
-
-
-
-
-  /** Parsing Logic: parse from kv from Storage into Edge */
-  def toEdge[K: CanSKeyValue](kv: K,
-                              queryParam: QueryParam,
-                              cacheElementOpt: Option[IndexEdge],
-                              parentEdges: Seq[EdgeWithScore]): Option[Edge] = 
{
-//        logger.debug(s"toEdge: $kv")
-    try {
-      val schemaVer = queryParam.label.schemaVersion
-      val indexEdgeOpt = 
indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), 
queryParam.label.schemaVersion, cacheElementOpt)
-      indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = 
parentEdges))
-    } catch {
-      case ex: Exception =>
-        logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}", ex)
-        None
-    }
-  }
-
-  def toSnapshotEdge[K: CanSKeyValue](kv: K,
-                                      queryParam: QueryParam,
-                                      cacheElementOpt: Option[SnapshotEdge] = 
None,
-                                      isInnerCall: Boolean,
-                                      parentEdges: Seq[EdgeWithScore]): 
Option[Edge] = {
-//        logger.debug(s"SnapshottoEdge: $kv")
-    val schemaVer = queryParam.label.schemaVersion
-    val snapshotEdgeOpt = 
snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), 
queryParam.label.schemaVersion, cacheElementOpt)
-
-    if (isInnerCall) {
-      snapshotEdgeOpt.flatMap { snapshotEdge =>
-        val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
-        if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
-        else None
-      }
-    } else {
-      snapshotEdgeOpt.flatMap { snapshotEdge =>
-        if (Edge.allPropsDeleted(snapshotEdge.props)) None
-        else {
-          val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
-          if (queryParam.where.map(_.filter(edge)).getOrElse(true)) 
Option(edge)
-          else None
-        }
-      }
-    }
-  }
-
-  def toEdges[K: CanSKeyValue](kvs: Seq[K],
-                               queryParam: QueryParam,
-                               prevScore: Double = 1.0,
-                               isInnerCall: Boolean,
-                               parentEdges: Seq[EdgeWithScore]): 
Seq[EdgeWithScore] = {
-    if (kvs.isEmpty) Seq.empty
-    else {
-      val first = kvs.head
-      val kv = first
-      val schemaVer = queryParam.label.schemaVersion
-      val cacheElementOpt =
-        if (queryParam.isSnapshotEdge) None
-        else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, 
Seq(kv), queryParam.label.schemaVersion, None)
-
-      for {
-        kv <- kvs
-        edge <-
-        if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, 
isInnerCall, parentEdges)
-        else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
-      } yield {
-        //TODO: Refactor this.
-        val currentScore =
-          queryParam.scorePropagateOp match {
-            case "plus" => edge.rank(queryParam.rank) + prevScore
-            case _ => edge.rank(queryParam.rank) * prevScore
-          }
-        EdgeWithScore(edge, currentScore)
-      }
-    }
-  }
-
-  /** End Of Parse Logic */
-
-//  /** methods for consistency */
-//  protected def writeAsyncSimple(zkQuorum: String, elementRpcs: 
Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
-//    if (elementRpcs.isEmpty) {
-//      Future.successful(true)
-//    } else {
-//      val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) }
-//      Future.sequence(futures).map(_.forall(identity))
-//    }
-//  }
-
-  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends Exception
-
-  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
-    val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
-    logger.debug(msg)
-  }
-
-  protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, 
edgeMutate: EdgeMutate) = {
-    val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}",
-      s"${edgeMutate.toLogString}").mkString("\n")
-    logger.debug(msg)
-  }
-
-  protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, 
kvOpt: Option[SKeyValue]) = {
-    val currentTs = System.currentTimeMillis()
-    val lockTs = snapshotEdgeOpt match {
-      case None => Option(currentTs)
-      case Some(snapshotEdge) =>
-        snapshotEdge.pendingEdgeOpt match {
-          case None => Option(currentTs)
-          case Some(pendingEdge) => pendingEdge.lockTs
-        }
-    }
-    val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1
-    //      snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1
-    val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = 
lockTs)
-    val base = snapshotEdgeOpt match {
-      case None =>
-        // no one ever mutated on this snapshotEdge.
-        edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
-      case Some(snapshotEdge) =>
-        // there is at least one mutation have been succeed.
-        snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = 
Option(pendingEdge))
-    }
-    base.copy(version = newVersion, statusCode = 1, lockTs = None)
-  }
-
-  protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: 
SnapshotEdge,
-                                     edgeMutate: EdgeMutate) = {
-    val newVersion = lockEdge.version + 1
-    val base = edgeMutate.newSnapshotEdge match {
-      case None =>
-        // shouldReplace false
-        assert(snapshotEdgeOpt.isDefined)
-        snapshotEdgeOpt.get.toSnapshotEdge
-      case Some(newSnapshotEdge) => newSnapshotEdge
-    }
-    base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None)
-  }
-
-  protected def acquireLock(statusCode: Byte,
-                            edge: Edge,
-                            oldSnapshotEdgeOpt: Option[Edge],
-                            lockEdge: SnapshotEdge,
-                            oldBytes: Array[Byte]): Future[Boolean] = {
-    if (statusCode >= 1) {
-      logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p")
-      else {
-        val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
-        val oldPut = oldSnapshotEdgeOpt.map(e => 
snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head)
-//        val lockEdgePut = buildPutAsync(lockEdge).head
-//        val oldPut = oldSnapshotEdgeOpt.map(e => 
buildPutAsync(e.toSnapshotEdge).head)
-        writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception =>
-          logger.error(s"AcquireLock RPC Failed.")
-          throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed")
-        }.map { ret =>
-          if (ret) {
-            val log = Seq(
-              "\n",
-              "=" * 50,
-              s"[Success]: acquireLock",
-              s"[RequestEdge]: ${edge.toLogString}",
-              s"[LockEdge]: ${lockEdge.toLogString()}",
-              s"[PendingEdge]: 
${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}",
-              "=" * 50, "\n").mkString("\n")
-
-            logger.debug(log)
-            //            debug(ret, "acquireLock", edge.toSnapshotEdge)
-          } else {
-            throw new PartialFailureException(edge, 0, "hbase fail.")
-          }
-          true
-        }
-      }
-    }
-  }
-
-
-
-  protected def releaseLock(predicate: Boolean,
-                            edge: Edge,
-                            lockEdge: SnapshotEdge,
-                            releaseLockEdge: SnapshotEdge,
-                            _edgeMutate: EdgeMutate,
-                            oldBytes: Array[Byte]): Future[Boolean] = {
-    if (!predicate) {
-      throw new PartialFailureException(edge, 3, "predicate failed.")
-    }
-    val p = Random.nextDouble()
-    if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p")
-    else {
-      val releaseLockEdgePut = 
snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head
-      val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head
-      writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith {
-        case ex: Exception =>
-          logger.error(s"ReleaseLock RPC Failed.")
-          throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed")
-      }.map { ret =>
-        if (ret) {
-          debug(ret, "releaseLock", edge.toSnapshotEdge)
-        } else {
-          val msg = Seq("\nFATAL ERROR\n",
-            "=" * 50,
-            oldBytes.toList,
-            lockEdgePut,
-            releaseLockEdgePut,
-            //            lockEdgePut.value.toList,
-            //            releaseLockEdgePut.value().toList,
-            "=" * 50,
-            "\n"
-          )
-          logger.error(msg.mkString("\n"))
-          //          error(ret, "releaseLock", edge.toSnapshotEdge)
-          throw new PartialFailureException(edge, 3, "hbase fail.")
-        }
-        true
-      }
-    }
-    Future.successful(true)
-  }
-
-
-  protected def mutate(predicate: Boolean,
-                       edge: Edge,
-                       statusCode: Byte,
-                       _edgeMutate: EdgeMutate): Future[Boolean] = {
-    if (!predicate) throw new PartialFailureException(edge, 1, "predicate 
failed.")
-
-    if (statusCode >= 2) {
-      logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p")
-      else
-        writeToStorage(edge.label.hbaseZkAddr, 
indexedEdgeMutations(_edgeMutate), withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate)
-          } else {
-            throw new PartialFailureException(edge, 1, "hbase fail.")
-          }
-          true
-        }
-    }
-  }
-
-  protected def increment(predicate: Boolean,
-                          edge: Edge,
-                          statusCode: Byte, _edgeMutate: EdgeMutate): 
Future[Boolean] = {
-    if (!predicate) throw new PartialFailureException(edge, 2, "predicate 
failed.")
-    if (statusCode >= 3) {
-      logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}")
-      Future.successful(true)
-    } else {
-      val p = Random.nextDouble()
-      if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p")
-      else
-        writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate), 
withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate)
-          } else {
-            throw new PartialFailureException(edge, 2, "hbase fail.")
-          }
-          true
-        }
-    }
-  }
-
-
-  /** this may be overrided by specific storage implementation */
-  protected def commitProcess(edge: Edge, statusCode: Byte)
-                             (snapshotEdgeOpt: Option[Edge], kvOpt: 
Option[SKeyValue])
-                             (lockEdge: SnapshotEdge, releaseLockEdge: 
SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = {
-    val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte])
-    for {
-      locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge, 
oldBytes)
-      mutated <- mutate(locked, edge, statusCode, _edgeMutate)
-      incremented <- increment(mutated, edge, statusCode, _edgeMutate)
-      released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, 
_edgeMutate, oldBytes)
-    } yield {
-      released
-    }
-  }
-
-  protected def commitUpdate(edge: Edge,
-                             statusCode: Byte)(snapshotEdgeOpt: Option[Edge],
-                                               kvOpt: Option[SKeyValue],
-                                               edgeUpdate: EdgeMutate): 
Future[Boolean] = {
-    val label = edge.label
-    def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty)
-
-    val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt)
-    val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, 
edgeUpdate)
-    val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_
-    snapshotEdgeOpt match {
-      case None =>
-        // no one ever did success on acquire lock.
-        _process(lockEdge, releaseLockEdge, edgeUpdate)
-      //        process(lockEdge, releaseLockEdge, edgeUpdate, statusCode)
-      case Some(snapshotEdge) =>
-        // someone did success on acquire lock at least one.
-        snapshotEdge.pendingEdgeOpt match {
-          case None =>
-            // not locked
-            _process(lockEdge, releaseLockEdge, edgeUpdate)
-          //            process(lockEdge, releaseLockEdge, edgeUpdate, 
statusCode)
-          case Some(pendingEdge) =>
-            def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < 
System.currentTimeMillis()
-            if (isLockExpired) {
-              val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
-              val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(pendingEdge))
-              val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, 
kvOpt)
-              val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
newLockEdge, newEdgeUpdate)
-              commitProcess(edge, statusCode = 0)(snapshotEdgeOpt, 
kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret =>
-                //              process(newLockEdge, newReleaseLockEdge, 
newEdgeUpdate, statusCode = 0).flatMap { ret =>
-                val log = s"[Success]: Resolving expired pending 
edge.\n${pendingEdge.toLogString}"
-                throw new PartialFailureException(edge, 0, log)
-              }
-            } else {
-              // locked
-              if (pendingEdge.ts == edge.ts && statusCode > 0) {
-                // self locked
-                val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) 
None else Option(snapshotEdge)
-                val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, 
Seq(edge))
-                val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, 
lockEdge, newEdgeUpdate)
-
-                /** lockEdge will be ignored */
-                _process(lockEdge, newReleaseLockEdge, newEdgeUpdate)
-                //                process(lockEdge, newReleaseLockEdge, 
newEdgeUpdate, statusCode)
-              } else {
-                throw new PartialFailureException(edge, statusCode, 
s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]")
-              }
-            }
-        }
-    }
-  }
-
-  /** end of methods for consistency */
-
-
-  //  def futureCache[T] = Cache[Long, (Long, T)]
-
-  protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
-    val srcVertex = queryRequest.vertex
-    //    val tgtVertexOpt = queryRequest.tgtVertexOpt
-    val edgeCf = Serializable.edgeCf
-    val queryParam = queryRequest.queryParam
-    val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
-    val label = queryParam.label
-    val labelWithDir = queryParam.labelWithDir
-    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
-    val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
-      case Some(tgtVertexId) => // _to is given.
-        /** we use toSnapshotEdge so dont need to swap src, tgt */
-        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-        val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, 
label.schemaVersion)
-        (src, tgt)
-      case None =>
-        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-        (src, src)
-    }
-
-    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
-    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
-    val currentTs = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs)).toMap
-    Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs)
-  }
-
-
-
-  protected def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, 
Option[Edge], Option[SKeyValue])] = {
-    val labelWithDir = edge.labelWithDir
-    val queryParam = QueryParam(labelWithDir)
-    val _queryParam = 
queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
-    val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
-    val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
-
-    fetchSnapshotEdgeKeyValues(buildRequest(queryRequest)).map { kvs =>
-      val (edgeOpt, kvOpt) =
-        if (kvs.isEmpty) (None, None)
-        else {
-          val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, 
parentEdges = Nil).headOption.map(_.edge)
-          val _kvOpt = kvs.headOption
-          (_edgeOpt, _kvOpt)
-        }
-      (queryParam, edgeOpt, kvOpt)
-    } recoverWith { case ex: Throwable =>
-      logger.error(s"fetchQueryParam failed. fallback return.", ex)
-      throw new FetchTimeoutException(s"${edge.toLogString}")
-    }
-  }
-
-  protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: 
Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
-    if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
-    else {
-      val queryRequest = queryRequestWithResultsLs.head.queryRequest
-      val q = orgQuery
-      val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
-
-      val stepIdx = queryRequest.stepIdx + 1
-
-      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
-      val prevStepThreshold = 
prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
-      val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
-      val step = q.steps(stepIdx)
-      val alreadyVisited =
-        if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
-        else Graph.alreadyVisitedVertices(queryResultsLs)
-
-      val groupedBy = queryResultsLs.flatMap { queryResult =>
-        queryResult.edgeWithScoreLs.map { case edgeWithScore =>
-          edgeWithScore.edge.tgtVertex -> edgeWithScore
-        }
-      }.groupBy { case (vertex, edgeWithScore) => vertex }
-
-      val groupedByFiltered = for {
-        (vertex, edgesWithScore) <- groupedBy
-        aggregatedScore = edgesWithScore.map(_._2.score).sum if 
aggregatedScore >= prevStepThreshold
-      } yield vertex -> aggregatedScore
-
-      val prevStepTgtVertexIdEdges = for {
-        (vertex, edgesWithScore) <- groupedBy
-      } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) 
=> edgeWithScore }
-
-      val nextStepSrcVertices = if (prevStepLimit >= 0) {
-        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
-      } else {
-        groupedByFiltered.toSeq
-      }
-
-      val queryRequests = for {
-        (vertex, prevStepScore) <- nextStepSrcVertices
-        queryParam <- step.queryParams
-      } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
-
-      Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), 
alreadyVisited)(ec)
-    }
-  }
-
-  protected def fetchStepFuture(orgQuery: Query, 
queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): 
Future[Seq[QueryRequestWithResult]] = {
-    for {
-      queryRequestWithResultLs <- queryRequestWithResultLsFuture
-      ret <- fetchStep(orgQuery, queryRequestWithResultLs)
-    } yield ret
-  }
-
-  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = {
-    val fallback = {
-      val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, 
queryParam = QueryParam.Empty)
-      Future.successful(q.vertices.map(v => 
QueryRequestWithResult(queryRequest, QueryResult())))
-    }
-    Try {
-
-      if (q.steps.isEmpty) {
-        // TODO: this should be get vertex query.
-        fallback
-      } else {
-        // current stepIdx = -1
-        val startQueryResultLs = QueryResult.fromVertices(q)
-        q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, 
step) =>
-            fetchStepFuture(q, acc)
-//          fetchStepFuture(q, acc).map { stepResults =>
-//            step.queryParams.zip(stepResults).foreach { case (qParam, 
queryRequestWithResult)  =>
-//              val cursor = 
Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor)
-//              qParam.cursorOpt = Option(cursor)
-//            }
-//            stepResults
-//          }
-        }
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
-
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): 
Future[Seq[QueryRequestWithResult]] = {
-    val ts = System.currentTimeMillis()
-    val futures = for {
-      (srcVertex, tgtVertex, queryParam) <- params
-      propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, queryParam.label.schemaVersion))
-      edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
-    } yield {
-        fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
-          val _queryParam = 
queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
-          val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
-          val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
-          val queryResult = QueryResult(edgeOpt.toSeq.map(e => 
EdgeWithScore(e, 1.0)))
-          QueryRequestWithResult(queryRequest, queryResult)
-        }
-      }
-
-    Future.sequence(futures)
-  }
-
-
-
-  @tailrec
-  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = 
Set.empty[Int]): Set[Int] = {
-    if (range < sampleNumber || set.size == sampleNumber) set
-    else randomInt(sampleNumber, range, set + Random.nextInt(range))
-  }
-
-  protected def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], 
n: Int): Seq[EdgeWithScore] = {
-    if (edges.size <= n){
-      edges
-    }else{
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
-        edges.tail
-      } else edges
-
-      val randoms = randomInt(n, plainEdges.size)
-      var samples = List.empty[EdgeWithScore]
-      var idx = 0
-      plainEdges.foreach { e =>
-        if (randoms.contains(idx)) samples = e :: samples
-        idx += 1
-      }
-      samples.toSeq
-    }
-
-  }
-  /** end of query */
-
-  /** Mutation Builder */
-
-
-  /** EdgeMutate */
-  def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
-    val deleteMutations = edgeMutate.edgesToDelete.flatMap { indexEdge =>
-      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete))
-    }
-    val insertMutations = edgeMutate.edgesToInsert.flatMap { indexEdge =>
-      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
-    }
-
-    deleteMutations ++ insertMutations
-  }
-
-  def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
-    edgeMutate.newSnapshotEdge.map(e => 
snapshotEdgeSerializer(e).toKeyValues).getOrElse(Nil)
-
-  def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] =
-    (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match 
{
-      case (true, true) =>
-
-        /** when there is no need to update. shouldUpdate == false */
-        List.empty
-      case (true, false) =>
-
-        /** no edges to delete but there is new edges to insert so increase 
degree by 1 */
-        edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
-      case (false, true) =>
-
-        /** no edges to insert but there is old edges to delete so decrease 
degree by 1 */
-        edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
-      case (false, false) =>
-
-        /** update on existing edges so no change on degree */
-        List.empty
-    }
-
-  /** IndexEdge */
-  def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[SKeyValue] = {
-    val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> 
InnerVal.withLong(amount, indexedEdge.schemaVer))
-    val _indexedEdge = indexedEdge.copy(props = newProps)
-    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment))
-  }
-
-  def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[SKeyValue] = {
-    val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> 
InnerVal.withLong(amount, indexedEdge.schemaVer))
-    val _indexedEdge = indexedEdge.copy(props = newProps)
-    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment))
-  }
-  def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = {
-    val kvs = vertexSerializer(vertex).toKeyValues
-    val kv = kvs.head
-    vertex.belongLabelIds.map { id =>
-      kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id)), operation = 
SKeyValue.Delete)
-    }
-  }
-
-  def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] =
-    if (edge.op == GraphUtil.operations("delete"))
-      buildDeleteBelongsToId(edge.srcForVertex) ++ 
buildDeleteBelongsToId(edge.tgtForVertex)
-    else
-      vertexSerializer(edge.srcForVertex).toKeyValues ++ 
vertexSerializer(edge.tgtForVertex).toKeyValues
-
-  def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
-    vertex.op match {
-      case d: Byte if d == GraphUtil.operations("delete") => 
vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete))
-      case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala
deleted file mode 100644
index 4b3300a..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.QueryParam
-import com.kakao.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, 
InnerValLikeWithTs}
-import org.apache.hadoop.hbase.util.Bytes
-
-object StorageDeserializable {
-  /** Deserializer */
-  def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): 
(Byte, Boolean) = {
-    val byte = bytes(offset)
-    val isInverted = if ((byte & 1) != 0) true else false
-    val labelOrderSeq = byte >> 1
-    (labelOrderSeq.toByte, isInverted)
-  }
-
-  def bytesToKeyValues(bytes: Array[Byte],
-                       offset: Int,
-                       length: Int,
-                       version: String): (Array[(Byte, InnerValLike)], Int) = {
-    var pos = offset
-    val len = bytes(pos)
-    pos += 1
-    val kvs = new Array[(Byte, InnerValLike)](len)
-    var i = 0
-    while (i < len) {
-      val k = bytes(pos)
-      pos += 1
-      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
-      pos += numOfBytesUsed
-      kvs(i) = (k -> v)
-      i += 1
-    }
-    val ret = (kvs, pos)
-    //    logger.debug(s"bytesToProps: $ret")
-    ret
-  }
-
-  def bytesToKeyValuesWithTs(bytes: Array[Byte],
-                             offset: Int,
-                             version: String): (Array[(Byte, 
InnerValLikeWithTs)], Int) = {
-    var pos = offset
-    val len = bytes(pos)
-    pos += 1
-    val kvs = new Array[(Byte, InnerValLikeWithTs)](len)
-    var i = 0
-    while (i < len) {
-      val k = bytes(pos)
-      pos += 1
-      val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, 
version)
-      pos += numOfBytesUsed
-      kvs(i) = (k -> v)
-      i += 1
-    }
-    val ret = (kvs, pos)
-    //    logger.debug(s"bytesToProps: $ret")
-    ret
-  }
-
-  def bytesToProps(bytes: Array[Byte],
-                   offset: Int,
-                   version: String): (Array[(Byte, InnerValLike)], Int) = {
-    var pos = offset
-    val len = bytes(pos)
-    pos += 1
-    val kvs = new Array[(Byte, InnerValLike)](len)
-    var i = 0
-    while (i < len) {
-      val k = HBaseType.EMPTY_SEQ_BYTE
-      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
-      pos += numOfBytesUsed
-      kvs(i) = (k -> v)
-      i += 1
-    }
-    //    logger.error(s"bytesToProps: $kvs")
-    val ret = (kvs, pos)
-
-    ret
-  }
-
-  def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, 
offset)
-}
-
-trait StorageDeserializable[E] {
-  def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], 
version: String, cacheElementOpt: Option[E]): Option[E] = {
-    try {
-      Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt))
-    } catch {
-      case e: Exception =>
-        logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
-        None
-    }
-  }
-  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], 
version: String, cacheElementOpt: Option[E]): E
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala
deleted file mode 100644
index 575f4ab..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.types.{InnerValLikeWithTs, InnerValLike}
-import org.apache.hadoop.hbase.util.Bytes
-
-object StorageSerializable {
-  /** serializer */
-  def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
-    val len = props.length
-    assert(len < Byte.MaxValue)
-    var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes)
-    bytes
-  }
-
-  def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
-    val len = props.length
-    assert(len < Byte.MaxValue)
-    var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
-    bytes
-  }
-
-  def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): 
Array[Byte] = {
-    val len = props.length
-    assert(len < Byte.MaxValue)
-    var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
-    bytes
-  }
-
-  def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): 
Array[Byte] = {
-    assert(labelOrderSeq < (1 << 6))
-    val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
-    Array.fill(1)(byte.toByte)
-  }
-}
-
-trait StorageSerializable[E] {
-  def toKeyValues: Seq[SKeyValue]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
deleted file mode 100644
index 8441c6b..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ /dev/null
@@ -1,533 +0,0 @@
-package com.kakao.s2graph.core.storage.hbase
-
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.storage._
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.{FutureCache, DeferCache, Extensions, 
logger}
-import com.stumbleupon.async.Deferred
-import com.typesafe.config.{ConfigFactory, Config}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
-import org.apache.hadoop.security.UserGroupInformation
-import org.hbase.async._
-import scala.collection.JavaConversions._
-import scala.collection.{Map, Seq}
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future, duration}
-import scala.util.hashing.MurmurHash3
-import java.util
-import java.util.Base64
-
-
-object AsynchbaseStorage {
-  val vertexCf = Serializable.vertexCf
-  val edgeCf = Serializable.edgeCf
-  val emptyKVs = new util.ArrayList[KeyValue]()
-
-
-  def makeClient(config: Config, overrideKv: (String, String)*) = {
-    val asyncConfig: org.hbase.async.Config =
-      if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
-        val krb5Conf = config.getString("java.security.krb5.conf")
-        val jaas = config.getString("java.security.auth.login.config")
-
-        System.setProperty("java.security.krb5.conf", krb5Conf)
-        System.setProperty("java.security.auth.login.config", jaas)
-        new org.hbase.async.Config()
-      } else {
-        new org.hbase.async.Config()
-      }
-
-    for (entry <- config.entrySet() if entry.getKey.contains("hbase")) {
-      asyncConfig.overrideConfig(entry.getKey, 
entry.getValue.unwrapped().toString)
-    }
-
-    for ((k, v) <- overrideKv) {
-      asyncConfig.overrideConfig(k, v)
-    }
-
-    val client = new HBaseClient(asyncConfig)
-    logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}")
-    client
-  }
-}
-
-
-class AsynchbaseStorage(override val config: Config)(implicit ec: 
ExecutionContext)
-  extends Storage[Deferred[QueryRequestWithResult]](config) {
-
-  import Extensions.DeferOps
-
-  /**
-   * Asynchbase client setup.
-   * note that we need two client, one for bulk(withWait=false) and another 
for withWait=true
-   */
-  val configWithFlush = 
config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval"
 -> "0")))
-  val client = AsynchbaseStorage.makeClient(config)
-
-  private val clientWithFlush = AsynchbaseStorage.makeClient(config, 
"hbase.rpcs.buffered_flush_interval" -> "0")
-  private val clients = Seq(client, clientWithFlush)
-  private val clientFlushInterval = 
config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
-  private val emptyKeyValues = new util.ArrayList[KeyValue]()
-  private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client
-
-  /** Future Cache to squash request */
-  private val futureCache = new DeferCache[QueryResult](config)(ec)
-
-  /** Simple Vertex Cache */
-  private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec)
-
-
-  /**
-   * fire rpcs into proper hbase cluster using client and
-   * return true on all mutation success. otherwise return false.
-   */
-  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean): Future[Boolean] = {
-    if (kvs.isEmpty) Future.successful(true)
-    else {
-      val _client = client(withWait)
-      val futures = kvs.map { kv =>
-        val _defer = kv.operation match {
-          case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, 
kv.cf, kv.qualifier, kv.value, kv.timestamp))
-          case SKeyValue.Delete =>
-            if (kv.qualifier == null) _client.delete(new 
DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp))
-            else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, 
kv.qualifier, kv.timestamp))
-          case SKeyValue.Increment =>
-            _client.atomicIncrement(new AtomicIncrementRequest(kv.table, 
kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)))
-        }
-        val future = _defer.withCallback { ret => true }.recoverWith { ex =>
-          logger.error(s"mutation failed. $kv", ex)
-          false
-        }.toFuture
-
-        if (withWait) future else Future.successful(true)
-      }
-
-      Future.sequence(futures).map(_.forall(identity))
-    }
-  }
-
-
-  override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): 
Future[Seq[SKeyValue]] = {
-    val defer = fetchKeyValuesInner(hbaseRpc)
-    defer.toFuture.map { kvsArr =>
-      kvsArr.map { kv =>
-        implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-      } toSeq
-    }
-  }
-
-  /**
-   * since HBase natively provide CheckAndSet on storage level, implementation 
becomes simple.
-   * @param rpc: key value that is need to be stored on storage.
-   * @param expectedOpt: last valid value for rpc's KeyValue.value from 
fetching.
-   * @return return true if expected value matches and our rpc is successfully 
applied, otherwise false.
-   *         note that when some other thread modified same cell and have 
different value on this KeyValue,
-   *         then HBase atomically return false.
-   */
-  override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): 
Future[Boolean] = {
-    val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, 
rpc.value, rpc.timestamp)
-    val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
-    client(withWait = true).compareAndSet(put, expected).withCallback(ret => 
ret.booleanValue()).toFuture
-  }
-
-
-  /**
-   * given queryRequest, build storage specific RPC Request.
-   * In HBase case, we either build Scanner or GetRequest.
-   *
-   * IndexEdge layer:
-   *    Tall schema(v4): use scanner.
-   *    Wide schema(label's schema version in v1, v2, v3): use GetRequest with 
columnRangeFilter
-   *                                                       when query is given 
with itnerval option.
-   * SnapshotEdge layer:
-   *    Tall schema(v3, v4): use GetRequest without column filter.
-   *    Wide schema(label's schema version in v1, v2): use GetRequest with 
columnRangeFilter.
-   * Vertex layer:
-   *    all version: use GetRequest without column filter.
-   * @param queryRequest
-   * @return Scanner or GetRequest with proper setup with StartKey, EndKey, 
RangeFilter.
-   */
-  override def buildRequest(queryRequest: QueryRequest): AnyRef = {
-    import Serializable._
-    val queryParam = queryRequest.queryParam
-    val label = queryParam.label
-    val edge = toRequestEdge(queryRequest)
-
-    val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
-      val snapshotEdge = edge.toSnapshotEdge
-      snapshotEdgeSerializer(snapshotEdge).toKeyValues.head
-      //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
-    } else {
-      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == 
queryParam.labelOrderSeq)
-      assert(indexedEdgeOpt.isDefined)
-
-      val indexedEdge = indexedEdgeOpt.get
-      indexEdgeSerializer(indexedEdge).toKeyValues.head
-    }
-
-    val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
-
-    label.schemaVersion match {
-      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
-        val scanner = client.newScanner(label.hbaseTableName.getBytes)
-        scanner.setFamily(edgeCf)
-
-        /**
-         * TODO: remove this part.
-         */
-        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => 
edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
-        val indexEdge = indexEdgeOpt.getOrElse(throw new 
RuntimeException(s"Can`t find index for query $queryParam"))
-
-        val srcIdBytes = 
VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-        val labelWithDirBytes = indexEdge.labelWithDir.bytes
-        val labelIndexSeqWithIsInvertedBytes = 
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = false)
-        //        val labelIndexSeqWithIsInvertedStopBytes =  
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = true)
-        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, 
Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op)))
-        val (startKey, stopKey) =
-          if (queryParam.columnRangeFilter != null) {
-            // interval is set.
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), 
Array.fill(1)(0))
-              case None => Bytes.add(baseKey, 
queryParam.columnRangeFilterMinBytes)
-            }
-            (_startKey, Bytes.add(baseKey, 
queryParam.columnRangeFilterMaxBytes))
-          } else {
-            /**
-             * note: since propsToBytes encode size of property map at first 
byte, we are sure about max value here
-             */
-            val _startKey = queryParam.cursorOpt match {
-              case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), 
Array.fill(1)(0))
-              case None => baseKey
-            }
-            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
-          }
-//                logger.debug(s"[StartKey]: ${startKey.toList}")
-//                logger.debug(s"[StopKey]: ${stopKey.toList}")
-
-        scanner.setStartKey(startKey)
-        scanner.setStopKey(stopKey)
-
-        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: 
$queryParam")
-
-        scanner.setMaxVersions(1)
-        scanner.setMaxNumRows(queryParam.limit)
-        scanner.setMaxTimestamp(maxTs)
-        scanner.setMinTimestamp(minTs)
-        scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
-        // SET option for this rpc properly.
-        scanner
-      case _ =>
-        val get =
-          if (queryParam.tgtVertexInnerIdOpt.isDefined) new 
GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier)
-          else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf)
-
-        get.maxVersions(1)
-        get.setFailfast(true)
-        get.setMaxResultsPerColumnFamily(queryParam.limit)
-        get.setRowOffsetPerColumnFamily(queryParam.offset)
-        get.setMinTimestamp(minTs)
-        get.setMaxTimestamp(maxTs)
-        get.setTimeout(queryParam.rpcTimeoutInMillis)
-
-        if (queryParam.columnRangeFilter != null) 
get.setFilter(queryParam.columnRangeFilter)
-
-        get
-    }
-  }
-
-  /**
-   * we are using future cache to squash requests into same key on storage.
-   *
-   * @param queryRequest
-   * @param prevStepScore
-   * @param isInnerCall
-   * @param parentEdges
-   * @return we use Deferred here since it has much better performrance 
compared to scala.concurrent.Future.
-   *         seems like map, flatMap on scala.concurrent.Future is slower than 
Deferred's addCallback
-   */
-  override def fetch(queryRequest: QueryRequest,
-                     prevStepScore: Double,
-                     isInnerCall: Boolean,
-                     parentEdges: Seq[EdgeWithScore]): 
Deferred[QueryRequestWithResult] = {
-
-    def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
-      fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
-        val edgeWithScores = toEdges(kvs, queryRequest.queryParam, 
prevStepScore, isInnerCall, parentEdges)
-        val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
-          sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
-        } else edgeWithScores
-        QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
-//        QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores, tailCursor = 
kvs.lastOption.map(_.key).getOrElse(Array.empty)))
-
-      } recoverWith { ex =>
-        logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-        QueryResult(isFailure = true)
-//        QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
-      }
-    }
-
-    val queryParam = queryRequest.queryParam
-    val cacheTTL = queryParam.cacheTTLInMillis
-    val request = buildRequest(queryRequest)
-
-    val defer =
-      if (cacheTTL <= 0) fetchInner(request)
-      else {
-        val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
-        val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
-        futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
-    }
-    defer withCallback { queryResult => QueryRequestWithResult(queryRequest, 
queryResult)}
-  }
-
-
-  override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, 
Double)],
-                       prevStepEdges: Predef.Map[VertexId, 
scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = {
-    val defers: Seq[Deferred[QueryRequestWithResult]] = for {
-      (queryRequest, prevStepScore) <- queryRequestWithScoreLs
-      parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
-    } yield fetch(queryRequest, prevStepScore, isInnerCall = false, 
parentEdges)
-
-    val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = 
Deferred.group(defers)
-    grouped withCallback {
-      queryResults: util.ArrayList[QueryRequestWithResult] =>
-        queryResults.toIndexedSeq
-    } toFuture
-  }
-
-
-  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = 
fetchSnapshotEdgeKeyValues(request)
-
-
-  /**
-   * when withWait is given, we use client with flushInterval set to 0.
-   * if we are not using this, then we are adding extra wait time as much as 
flushInterval in worst case.
-   *
-   * @param edges
-   * @param withWait
-   * @return
-   */
-  override def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]] = {
-    val _client = client(withWait)
-    val defers: Seq[Deferred[(Boolean, Long)]] = for {
-      edge <- edges
-    } yield {
-        val edgeWithIndex = edge.edgesWithIndex.head
-        val countWithTs = edge.propsWithTs(LabelMeta.countSeq)
-        val countVal = countWithTs.innerVal.toString().toLong
-        val incr = buildIncrementsCountAsync(edgeWithIndex, countVal).head
-        val request = incr.asInstanceOf[AtomicIncrementRequest]
-        _client.bufferAtomicIncrement(request) withCallback { resultCount: 
java.lang.Long =>
-          (true, resultCount.longValue())
-        } recoverWith { ex =>
-          logger.error(s"mutation failed. $request", ex)
-          (false, -1L)
-        }
-      }
-
-    val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = 
Deferred.groupInOrder(defers)
-    grouped.toFuture.map(_.toSeq)
-  }
-
-
-  override def flush(): Unit = clients.foreach { client =>
-    val timeout = Duration((clientFlushInterval + 10) * 20, 
duration.MILLISECONDS)
-    Await.result(client.flush().toFuture, timeout)
-  }
-
-
-  override def createTable(zkAddr: String,
-                           tableName: String,
-                           cfs: List[String],
-                           regionMultiplier: Int,
-                           ttl: Option[Int],
-                           compressionAlgorithm: String): Unit = {
-    logger.info(s"create table: $tableName on $zkAddr, $cfs, 
$regionMultiplier, $compressionAlgorithm")
-    val admin = getAdmin(zkAddr)
-    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
-    if (!admin.tableExists(TableName.valueOf(tableName))) {
-      try {
-        val desc = new HTableDescriptor(TableName.valueOf(tableName))
-        desc.setDurability(Durability.ASYNC_WAL)
-        for (cf <- cfs) {
-          val columnDesc = new HColumnDescriptor(cf)
-            
.setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-            .setBloomFilterType(BloomType.ROW)
-            .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-            .setMaxVersions(1)
-            .setTimeToLive(2147483647)
-            .setMinVersions(0)
-            .setBlocksize(32768)
-            .setBlockCacheEnabled(true)
-          if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-          desc.addFamily(columnDesc)
-        }
-
-        if (regionCount <= 1) admin.createTable(desc)
-        else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
-      } catch {
-        case e: Throwable =>
-          logger.error(s"$zkAddr, $tableName failed with $e", e)
-          throw e
-      }
-    } else {
-      logger.info(s"$zkAddr, $tableName, $cfs already exist.")
-    }
-  }
-
-
-  /** Asynchbase implementation override default getVertices to use future 
Cache */
-  override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
-    def fromResult(queryParam: QueryParam,
-                   kvs: Seq[SKeyValue],
-                   version: String): Option[Vertex] = {
-
-      if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
-    }
-
-    val futures = vertices.map { vertex =>
-      val kvs = vertexSerializer(vertex).toKeyValues
-      val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, 
Serializable.vertexCf)
-      //      get.setTimeout(this.singleGetTimeout.toShort)
-      get.setFailfast(true)
-      get.maxVersions(1)
-
-      val cacheKey = MurmurHash3.stringHash(get.toString)
-      vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 
10000)(fetchVertexKeyValues(get)).map { kvs =>
-        fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion)
-      }
-    }
-
-    Future.sequence(futures).map { result => result.toList.flatten }
-  }
-
-
-
-
-
-  /**
-   * Private Methods which is specific to Asynchbase implementation.
-   */
-  private def fetchKeyValuesInner(rpc: AnyRef): 
Deferred[util.ArrayList[KeyValue]] = {
-    rpc match {
-      case getRequest: GetRequest => client.get(getRequest)
-      case scanner: Scanner =>
-        scanner.nextRows().withCallback { kvsLs =>
-          val ls = new util.ArrayList[KeyValue]
-          if (kvsLs == null) {
-
-          } else {
-            kvsLs.foreach { kvs =>
-              if (kvs != null) kvs.foreach { kv => ls.add(kv) }
-              else {
-
-              }
-            }
-          }
-          scanner.close()
-          ls
-        }.recoverWith { ex =>
-          logger.error(s"fetchKeyValuesInner failed.", ex)
-          scanner.close()
-          emptyKeyValues
-        }
-      case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues 
failed. $rpc"))
-    }
-  }
-
-  private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = {
-    hbaseRpc match {
-      case getRequest: GetRequest => getRequest.key()
-      case scanner: Scanner => scanner.getCurrentKey()
-      case _ =>
-        logger.error(s"toCacheKeyBytes failed. not supported class type. 
$hbaseRpc")
-        Array.empty[Byte]
-    }
-  }
-
-  private def getSecureClusterAdmin(zkAddr: String) = {
-    val jaas = config.getString("java.security.auth.login.config")
-    val krb5Conf = config.getString("java.security.krb5.conf")
-    val realm = config.getString("realm")
-    val principal = config.getString("principal")
-    val keytab = config.getString("keytab")
-
-
-
-    System.setProperty("java.security.auth.login.config", jaas)
-    System.setProperty("java.security.krb5.conf", krb5Conf)
-    // System.setProperty("sun.security.krb5.debug", "true")
-    // System.setProperty("sun.security.spnego.debug", "true")
-    val conf = new Configuration(true)
-    val hConf = HBaseConfiguration.create(conf)
-
-    hConf.set("hbase.zookeeper.quorum", zkAddr)
-
-    hConf.set("hadoop.security.authentication", "Kerberos")
-    hConf.set("hbase.security.authentication", "Kerberos")
-    hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm)
-    hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm)
-
-    System.out.println("Connecting secure cluster, using keytab\n")
-    UserGroupInformation.setConfiguration(hConf)
-    UserGroupInformation.loginUserFromKeytab(principal, keytab)
-    val currentUser = UserGroupInformation.getCurrentUser()
-    System.out.println("current user : " + currentUser + "\n")
-
-    // get table list
-    val conn = ConnectionFactory.createConnection(hConf)
-    conn.getAdmin
-  }
-
-  /**
-   * following configuration need to come together to use secured hbase 
cluster.
-   * 1. set hbase.security.auth.enable = true
-   * 2. set file path to jaas file java.security.auth.login.config
-   * 3. set file path to kerberos file java.security.krb5.conf
-   * 4. set realm
-   * 5. set principal
-   * 6. set file path to keytab
-   * @param zkAddr
-   * @return
-   */
-  private def getAdmin(zkAddr: String) = {
-    if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
-      getSecureClusterAdmin(zkAddr)
-    } else {
-      val conf = HBaseConfiguration.create()
-      conf.set("hbase.zookeeper.quorum", zkAddr)
-      val conn = ConnectionFactory.createConnection(conf)
-      conn.getAdmin
-    }
-  }
-
-  private def enableTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
-  }
-
-  private def disableTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
-  }
-
-  private def dropTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
-    getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
-  }
-
-  private def getStartKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount))
-  }
-
-  private def getEndKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
deleted file mode 100644
index 014a5c9..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.indexedge.tall
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.StorageDeserializable._
-import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, 
SKeyValue, StorageDeserializable}
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[IndexEdge] {
-   import StorageDeserializable._
-
-   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, 
Int)
-   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
-   private def parseDegreeQualifier(kv: SKeyValue, version: String): 
QualifierRaw = {
-     //    val degree = Bytes.toLong(kv.value)
-     val degree = bytesToLongFunc(kv.value, 0)
-     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, 
version))
-     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
-     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-   }
-
-   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-     var qualifierLen = 0
-     var pos = 0
-     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
-       pos = endAt
-       qualifierLen += endAt
-       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
version)
-       }
-       qualifierLen += tgtVertexIdLen
-       (props, endAt, tgtVertexId, tgtVertexIdLen)
-     }
-     val (op, opLen) =
-       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-       else (kv.qualifier(qualifierLen), 1)
-
-     qualifierLen += opLen
-
-     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-   }
-
-   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
version)
-     (props, endAt)
-   }
-
-   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-     (Array.empty[(Byte, InnerValLike)], 0)
-   }
-
-   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                    _kvs: Seq[T],
-                                                    version: String,
-                                                    cacheElementOpt: 
Option[IndexEdge]): IndexEdge = {
-
-     assert(_kvs.size == 1)
-
-     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-     val kv = kvs.head
-
-     //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, 
${kv.value.toList}")
-     var pos = 0
-     val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
-     pos += srcIdLen
-     val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-     pos += 4
-     val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-     pos += 1
-
-     val op = kv.row(pos)
-     pos += 1
-
-     if (pos == kv.row.length) {
-       // degree
-       //      val degreeVal = Bytes.toLong(kv.value)
-       val degreeVal = bytesToLongFunc(kv.value, 0)
-       val ts = kv.timestamp
-       val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, 
version),
-         LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version))
-       val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
-       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), 
labelWithDir, op, ts, labelIdxSeq, props)
-     } else {
-       // not degree edge
-       val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw 
new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, 
${labelIdxSeq}"))
-
-       val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version)
-       pos = endAt
-       val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version)
-       }
-
-       val idxProps = for {
-         (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-       } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v
-
-       val idxPropsMap = idxProps.toMap
-
-       val tgtVertexId =
-         idxPropsMap.get(LabelMeta.toSeq) match {
-           case None => tgtVertexIdRaw
-           case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
-         }
-
-       val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
-         //        val countVal = Bytes.toLong(kv.value)
-         val countVal = bytesToLongFunc(kv.value, 0)
-         val dummyProps = Array(LabelMeta.countSeq -> 
InnerVal.withLong(countVal, version))
-         (dummyProps, 8)
-       } else {
-         bytesToKeyValues(kv.value, 0, kv.value.length, version)
-       }
-
-       val _mergedProps = (idxProps ++ props).toMap
-       val mergedProps =
-         if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-         else _mergedProps + (LabelMeta.timeStampSeq -> 
InnerVal.withLong(kv.timestamp, version))
-
-       val ts = kv.timestamp
-       IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), 
labelWithDir, op, ts, labelIdxSeq, mergedProps)
-
-     }
-   }
- }

Reply via email to