http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 57d4872..c9353e1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -20,948 +20,16 @@ package org.apache.s2graph.core.storage -import org.apache.s2graph.core.GraphExceptions.{NoStackException, FetchTimeoutException} +import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} -import org.apache.s2graph.core.parsers.WhereParser -import org.apache.s2graph.core.storage.serde.indexedge.wide.{IndexEdgeDeserializable, IndexEdgeSerializable} -import org.apache.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable -import org.apache.s2graph.core.storage.serde.vertex.{VertexDeserializable, VertexSerializable} +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} -import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Random, Try} -import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.{ExecutionContext, Future} -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.hadoop.hbase.util.Bytes - - -abstract class Storage[Q, R](val graph: S2Graph, - val config: Config)(implicit ec: ExecutionContext) { - import HBaseType._ - import S2Graph._ - - val BackoffTimeout = graph.BackoffTimeout - val MaxRetryNum = graph.MaxRetryNum - val MaxBackOff = graph.MaxBackOff - val FailProb = graph.FailProb - val LockExpireDuration = graph.LockExpireDuration - val MaxSize = graph.MaxSize - val ExpireAfterWrite = graph.ExpireAfterWrite - val ExpireAfterAccess = graph.ExpireAfterAccess - - /** retry scheduler */ - val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() - - - /** - * Compatibility table - * | label schema version | snapshot edge | index edge | vertex | note | - * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | - * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | - * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema | - * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | experimental schema. use scanner instead of get | - * - */ - - /** - * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue] - * so we can store this kvs. - * @param snapshotEdge: snapshotEdge to serialize - * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] - */ - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): Serializable[SnapshotEdge] = { - snapshotEdge.schemaVer match { -// case VERSION1 | - case VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge) - case VERSION3 | VERSION4 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) - case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}") - } - } - - /** - * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue] - * @param indexEdge: indexEdge to serialize - * @return serializer implementation - */ - def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = { - indexEdge.schemaVer match { -// case VERSION1 - case VERSION2 | VERSION3 => new IndexEdgeSerializable(indexEdge) - case VERSION4 => new serde.indexedge.tall.IndexEdgeSerializable(indexEdge) - case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}") - - } - } - - /** - * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue] - * @param vertex: vertex to serialize - * @return serializer implementation - */ - def vertexSerializer(vertex: S2Vertex): Serializable[S2Vertex] = new VertexSerializable(vertex) - - /** - * create deserializer that can parse stored CanSKeyValue into snapshotEdge. - * note that each storage implementation should implement implicit type class - * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue - * - * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method. - * if any storaage use different class to represent stored byte array, - * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. - * */ - - val snapshotEdgeDeserializer: Deserializable[SnapshotEdge] = new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph) - - def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] = snapshotEdgeDeserializer - - /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ - val indexEdgeDeserializer: Deserializable[S2Edge] = new serde.indexedge.tall.IndexEdgeDeserializable(graph) - - def indexEdgeDeserializer(schemaVer: String) = new serde.indexedge.tall.IndexEdgeDeserializable(graph) - - /** create deserializer that can parser stored CanSKeyValue into vertex. */ - val vertexDeserializer: Deserializable[S2Vertex] = new VertexDeserializable(graph) - - - /** - * decide how to store given key values Seq[SKeyValue] into storage using storage's client. - * note that this should be return true on all success. - * we assumes that each storage implementation has client as member variable. - * - * - * @param cluster: where this key values should be stored. - * @param kvs: sequence of SKeyValue that need to be stored in storage. - * @param withWait: flag to control wait ack from storage. - * note that in AsynchbaseStorage(which support asynchronous operations), even with true, - * it never block thread, but rather submit work and notified by event loop when storage send ack back. - * @return ack message from storage. - */ - def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] - -// def writeToStorage(kv: SKeyValue, withWait: Boolean): Future[Boolean] - - /** - * fetch SnapshotEdge for given request from storage. - * also storage datatype should be converted into SKeyValue. - * note that return type is Sequence rather than single SKeyValue for simplicity, - * even though there is assertions sequence.length == 1. - * @param request - * @return - */ - def fetchSnapshotEdgeKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] - - /** - * write requestKeyValue into storage if the current value in storage that is stored matches. - * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. - * - * Most important thing is this have to be 'atomic' operation. - * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be - * either blocked or failed on write-write conflict case. - * - * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to - * prevent wrong data for read. - * - * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, - * compareAndSet to synchronize. - * - * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. - * for storage that does not support concurrency control, then storage implementation - * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) - * and write(writeLock). - * @param requestKeyValue - * @param expectedOpt - * @return - */ - def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] - - /** - * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. - * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build - * client request(GetRequest, Scanner) based on user provided query. - * - * @param queryRequest - * @return - */ - protected def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q - - /** - * fetch IndexEdges for given queryParam in queryRequest. - * this expect previous step starting score to propagate score into next step. - * also parentEdges is necessary to return full bfs tree when query require it. - * - * note that return type is general type. - * for example, currently we wanted to use Asynchbase - * so single I/O return type should be Deferred[T]. - * - * if we use native hbase client, then this return type can be Future[T] or just T. - * - * @param queryRequest - * @param isInnerCall - * @param parentEdges - * @return - */ - def fetch(queryRequest: QueryRequest, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): R - - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * - * @param queryRequests - * @param prevStepEdges - * @return - */ - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] - - /** - * fetch Vertex for given request from storage. - * - * @param request - * @return - */ - def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] - - /** - * decide how to apply given edges(indexProps values + Map(_count -> countVal)) into storage. - * - * @param edges - * @param withWait - * @return - */ - def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] - - /** - * this method need to be called when client shutdown. this is responsible to cleanUp the resources - * such as client into storage. - */ - def flush(): Unit = { - } - - def fetchEdgesAll(): Future[Seq[S2Edge]] - - def fetchVerticesAll(): Future[Seq[S2Vertex]] - - /** - * create table on storage. - * if storage implementation does not support namespace or table, then there is nothing to be done - * - * @param zkAddr - * @param tableName - * @param cfs - * @param regionMultiplier - * @param ttl - * @param compressionAlgorithm - */ - def createTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String, - replicationScopeOpt: Option[Int] = None, - totalRegionCount: Option[Int] = None): Unit - - def truncateTable(zkAddr: String, tableNameStr: String): Unit = {} - - def deleteTable(zkAddr: String, tableNameStr: String): Unit = {} - - def shutdown(): Unit - - /** Public Interface */ - def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { - def fromResult(kvs: Seq[SKeyValue], - version: String): Option[S2Vertex] = { - if (kvs.isEmpty) None - else vertexDeserializer.fromKeyValues(kvs, None) -// .map(S2Vertex(graph, _)) - } - - val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - - fetchVertexKeyValues(queryRequest).map { kvs => - fromResult(kvs, vertex.serviceColumn.schemaVersion) - } recoverWith { case ex: Throwable => - Future.successful(None) - } - } - - Future.sequence(futures).map { result => result.toList.flatten } - } - def mutateStrongEdges(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = { - - val edgeWithIdxs = _edges.zipWithIndex - val grouped = edgeWithIdxs.groupBy { case (edge, idx) => - (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) - } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - // After deleteAll, process others - val mutateEdgeFutures = edges.toList match { - case head :: tail => - val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait) - - //TODO: decide what we will do on failure on vertex put - val puts = buildVertexPutsAsync(head) - val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - case Nil => Nil - } - - val composed = for { -// deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield mutateRet - - composed.map(_.forall(identity)).map { ret => idxs.map( idx => idx -> ret) } - } - - Future.sequence(mutateEdges).map { squashedRets => - squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) - } - } - - def mutateVertex(vertex: S2Vertex, withWait: Boolean): Future[Boolean] = { - if (vertex.op == GraphUtil.operations("delete")) { - writeToStorage(vertex.hbaseZkAddr, - vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(true) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - writeToStorage(vertex.hbaseZkAddr, buildPutsAll(vertex), withWait) - } - } - - def mutateVertices(vertices: Seq[S2Vertex], - withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = vertices.map { vertex => mutateVertex(vertex, withWait) } - Future.sequence(futures) - } - - - def mutateEdgesInner(edges: Seq[S2Edge], - checkConsistency: Boolean, - withWait: Boolean): Future[Boolean] = { - assert(edges.nonEmpty) - // TODO:: remove after code review: unreachable code - if (!checkConsistency) { - - val zkQuorum = edges.head.innerLabel.hbaseZkAddr - val futures = edges.map { edge => - val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = increments(edgeUpdate.deepCopy) - val mutations = - indexedEdgeMutations(edgeUpdate.deepCopy) ++ snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - - if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) - - writeToStorage(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => rets.forall(identity) } - } else { - fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => - retry(1)(edges, 0, snapshotEdgeOpt) - } - } - } - - def exponentialBackOff(tryNum: Int) = { - // time slot is divided by 10 ms - val slot = 10 - Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt) - } - - def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = { - if (tryNum >= MaxRetryNum) { - edges.foreach { edge => - logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") - } - - Future.successful(false) - } else { - val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt) - future.onSuccess { - case success => - logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") - } - future recoverWith { - case FetchTimeoutException(retryEdge) => - logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") - /* fetch failed. re-fetch should be done */ - fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => - retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) - } - - case PartialFailureException(retryEdge, failedStatusCode, faileReason) => - val status = failedStatusCode match { - case 0 => "AcquireLock failed." - case 1 => "Mutation failed." - case 2 => "Increment failed." - case 3 => "ReleaseLock failed." - case 4 => "Unknown" - } - logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") - - /* retry logic */ - val promise = Promise[Boolean] - val backOff = exponentialBackOff(tryNum) - scheduledThreadPool.schedule(new Runnable { - override def run(): Unit = { - val future = if (failedStatusCode == 0) { - // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. - /* fetch failed. re-fetch should be done */ - fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => - retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) - } - } else { - // partial failure occur while self locked and mutating. - // assert(fetchedSnapshotEdgeOpt.nonEmpty) - retry(tryNum + 1)(edges, failedStatusCode, fetchedSnapshotEdgeOpt) - } - promise.completeWith(future) - } - - }, backOff, TimeUnit.MILLISECONDS) - promise.future - - case ex: Exception => - logger.error("Unknown exception", ex) - Future.successful(false) - } - } - } - - protected def commitUpdate(edges: Seq[S2Edge], - statusCode: Byte, - fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = { -// Future.failed(new PartialFailureException(edges.head, 0, "ahahah")) - assert(edges.nonEmpty) -// assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined) - - statusCode match { - case 0 => - fetchedSnapshotEdgeOpt match { - case None => - /* - * no one has never mutated this SN. - * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges) - * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1) - * lock = (squashedEdge, pendingE) - * releaseLock = (edgeMutate.newSnapshotEdge, None) - */ - val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) - - assert(edgeMutate.newSnapshotEdge.isDefined) - - val lockTs = Option(System.currentTimeMillis()) - val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1) - val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, - pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) - - commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) - - case Some(snapshotEdge) => - snapshotEdge.pendingEdgeOpt match { - case None => - /* - * others finished commit on this SN. but there is no contention. - * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges) - * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ? - * lock = (snapshotEdge, pendingE) - * releaseLock = (edgeMutate.newSnapshotEdge, None) - */ - val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) - if (edgeMutate.newSnapshotEdge.isEmpty) { - logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}") - Future.successful(true) - } else { - val lockTs = Option(System.currentTimeMillis()) - val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) - val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, - pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) - commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) - } - case Some(pendingEdge) => - val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() - if (isLockExpired) { - /* - * if pendingEdge.ts == snapshotEdge.ts => - * (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge)) - * else => - * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge)) - * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) - * lock = (snapshotEdge, pendingE) - * releaseLock = (edgeMutate.newSnapshotEdge, None) - */ - logger.debug(s"${pendingEdge.toLogString} has been expired.") - val (squashedEdge, edgeMutate) = - if (pendingEdge.ts == snapshotEdge.ts) S2Edge.buildOperation(None, pendingEdge +: edges) - else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges) - - val lockTs = Option(System.currentTimeMillis()) - val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) - val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge)) - val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, - pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) - - commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) - } else { - /* - * others finished commit on this SN and there is currently contention. - * this can't be proceed so retry from re-fetch. - * throw EX - */ - val (squashedEdge, _) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) - Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]")) - } - } - - } - case _ => - - /* - * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock - */ - - /* - * this succeed to lock this SN. keep doing on commit process. - * if SN.isEmpty => - * no one never succed to commit on this SN. - * this is first mutation try on this SN. - * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges) - * else => - * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when self retrying. - * there has been success commit on this SN. - * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges) - * releaseLock = (edgeMutate.newSnapshotEdge, None) - */ - val _edges = - if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges - else edges - val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) - val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2 - val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match { - case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) - case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) - } - // lockSnapshotEdge will be ignored. - commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) - } - } - /** - * orchestrate commit process. - * we separate into 4 step to avoid duplicating each step over and over. - * - * @param statusCode: current statusCode of this thread to process edges. - * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. - * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before commit process begin. - * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this snapshotEdge for this thread. - * @param releaseLockSnapshotEdge: releaseLockEdge that will remove lock by storing new final merged states - * all from current request edges and fetched snapshotEdge. - * @param edgeMutate: mutations for indexEdge and snapshotEdge. - * @return - */ - protected def commitProcess(statusCode: Byte, - squashedEdge: S2Edge, - fetchedSnapshotEdgeOpt:Option[S2Edge], - lockSnapshotEdge: SnapshotEdge, - releaseLockSnapshotEdge: SnapshotEdge, - edgeMutate: EdgeMutate): Future[Boolean] = { - for { - locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge) - mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge, edgeMutate) - incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode, squashedEdge, edgeMutate) - lockReleased <- releaseLock(incremented, statusCode, squashedEdge, releaseLockSnapshotEdge) - } yield lockReleased - } - - case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason) - - protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") - logger.debug(msg) - } - - protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", - s"${edgeMutate.toLogString}").mkString("\n") - logger.debug(msg) - } - - /** - * try to acquire lock on storage for this given snapshotEdge(lockEdge). - * - * @param statusCode: current statusCode of this thread to process edges. - * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug - * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage. - * @param lockEdge: lockEdge to build RPC request(compareAndSet) into Storage. - * @return - */ - protected def acquireLock(statusCode: Byte, - squashedEdge: S2Edge, - fetchedSnapshotEdgeOpt: Option[S2Edge], - lockEdge: SnapshotEdge): Future[Boolean] = { - if (statusCode >= 1) { - logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) { - Future.failed(new PartialFailureException(squashedEdge, 0, s"$p")) - } else { - val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head - val oldPut = fetchedSnapshotEdgeOpt.map(e => snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head) - writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception => - logger.error(s"AcquireLock RPC Failed.") - throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC Failed") - }.map { ret => - if (ret) { - val log = Seq( - "\n", - "=" * 50, - s"[Success]: acquireLock", - s"[RequestEdge]: ${squashedEdge.toLogString}", - s"[LockEdge]: ${lockEdge.toLogString()}", - s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", - "=" * 50, "\n").mkString("\n") - - logger.debug(log) - // debug(ret, "acquireLock", edge.toSnapshotEdge) - } else { - throw new PartialFailureException(squashedEdge, 0, "hbase fail.") - } - true - } - } - } - } - - - /** - * change this snapshot's state on storage from locked into committed by - * storing new merged states on storage. merge state come from releaseLockEdge. - * note that releaseLock return Future.failed on predicate failure. - * - * @param predicate: indicate if this releaseLock phase should be proceed or not. - * @param statusCode: releaseLock do not use statusCode, only for debug. - * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug - * @param releaseLockEdge: final merged states if all process goes well. - * @return - */ - protected def releaseLock(predicate: Boolean, - statusCode: Byte, - squashedEdge: S2Edge, - releaseLockEdge: SnapshotEdge): Future[Boolean] = { - if (!predicate) { - Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed.")) - } else { - val p = Random.nextDouble() - if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 3, s"$p")) - else { - val releaseLockEdgePuts = snapshotEdgeSerializer(releaseLockEdge).toKeyValues - writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith { - case ex: Exception => - logger.error(s"ReleaseLock RPC Failed.") - throw new PartialFailureException(squashedEdge, 3, "ReleaseLock RPC Failed") - }.map { ret => - if (ret) { - debug(ret, "releaseLock", squashedEdge.toSnapshotEdge) - } else { - val msg = Seq("\nFATAL ERROR\n", - "=" * 50, - squashedEdge.toLogString, - releaseLockEdgePuts, - "=" * 50, - "\n" - ) - logger.error(msg.mkString("\n")) - // error(ret, "releaseLock", edge.toSnapshotEdge) - throw new PartialFailureException(squashedEdge, 3, "hbase fail.") - } - true - } - } - } - } - - /** - * - * @param predicate: indicate if this commitIndexEdgeMutations phase should be proceed or not. - * @param statusCode: current statusCode of this thread to process edges. - * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug - * @param edgeMutate: actual collection of mutations. note that edgeMutate contains snapshotEdge mutations, - * but in here, we only use indexEdge's mutations. - * @return - */ - protected def commitIndexEdgeMutations(predicate: Boolean, - statusCode: Byte, - squashedEdge: S2Edge, - edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed.")) - else { - if (statusCode >= 2) { - logger.debug(s"skip mutate: [$statusCode]\n${squashedEdge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 1, s"$p")) - else - writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate) - } else { - throw new PartialFailureException(squashedEdge, 1, "hbase fail.") - } - true - } - } - } - } - - /** - * - * @param predicate: indicate if this commitIndexEdgeMutations phase should be proceed or not. - * @param statusCode: current statusCode of this thread to process edges. - * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug - * @param edgeMutate: actual collection of mutations. note that edgeMutate contains snapshotEdge mutations, - * but in here, we only use indexEdge's degree mutations. - * @return - */ - protected def commitIndexEdgeDegreeMutations(predicate: Boolean, - statusCode: Byte, - squashedEdge: S2Edge, - edgeMutate: EdgeMutate): Future[Boolean] = { - - def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { - writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, kvs, withWait = withWait).map { ret => - if (ret) { - debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate) - } else { - throw new PartialFailureException(squashedEdge, 2, "hbase fail.") - } - true - } - } - - if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, "predicate failed.")) - if (statusCode >= 3) { - logger.debug(s"skip increment: [$statusCode]\n${squashedEdge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p")) - else { - val (bufferIncr, nonBufferIncr) = increments(edgeMutate.deepCopy) - - if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false) - _write(nonBufferIncr, withWait = true) - } - } - } - - /** end of methods for consistency */ - - def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge], - newEdge: S2Edge, edgeMutate: EdgeMutate) = - Seq("----------------------------------------------", - s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", - s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", - s"newEdge: ${newEdge.toLogString}", - s"mutation: \n${edgeMutate.toLogString}", - "----------------------------------------------").mkString("\n") - - - /** Delete All */ - def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int): Future[Boolean] = { - if (stepInnerResult.isEmpty) Future.successful(true) - else { - val head = stepInnerResult.edgeWithScores.head - val zkQuorum = head.edge.innerLabel.hbaseZkAddr - val futures = for { - edgeWithScore <- stepInnerResult.edgeWithScores - } yield { - val edge = edgeWithScore.edge - val score = edgeWithScore.score - - val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - - val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) - val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - buildIncrementsAsync(indexEdge, -1L) - } - - /* reverted direction */ - val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - buildIncrementsAsync(indexEdge, -1L) - } - - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - - writeToStorage(zkQuorum, mutations, withWait = true) - } - - Future.sequence(futures).map { rets => rets.forall(identity) } - } - } - - /** End Of Delete All */ - - - - - /** Parsing Logic: parse from kv from Storage into Edge */ - def toEdge[K: CanSKeyValue](kv: K, - queryRequest: QueryRequest, - cacheElementOpt: Option[S2Edge], - parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { - logger.debug(s"toEdge: $kv") - - try { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val schemaVer = queryParam.label.schemaVersion - val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt) - if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges)) - else indexEdgeOpt - } catch { - case ex: Exception => - logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex) - None - } - } - - def toSnapshotEdge[K: CanSKeyValue](kv: K, - queryRequest: QueryRequest, - cacheElementOpt: Option[SnapshotEdge] = None, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { -// logger.debug(s"SnapshottoEdge: $kv") - val queryParam = queryRequest.queryParam - val schemaVer = queryParam.label.schemaVersion - val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt) - - if (isInnerCall) { - snapshotEdgeOpt.flatMap { snapshotEdge => - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) - if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) - else None - } - } else { - snapshotEdgeOpt.flatMap { snapshotEdge => - if (snapshotEdge.allPropsDeleted) None - else { - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) - if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) - else None - } - } - } - } - - val dummyCursor: Array[Byte] = Array.empty - - def toEdges[K: CanSKeyValue](kvs: Seq[K], - queryRequest: QueryRequest, - prevScore: Double = 1.0, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore], - startOffset: Int = 0, - len: Int = Int.MaxValue): StepResult = { - - val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _ - - if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor)) - else { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val labelWeight = queryRequest.labelWeight - val nextStepOpt = queryRequest.nextStepOpt - val where = queryParam.where.get - val label = queryParam.label - val isDefaultTransformer = queryParam.edgeTransformer.isDefault - val first = kvs.head - val kv = first - val schemaVer = queryParam.label.schemaVersion - val cacheElementOpt = - if (queryParam.isSnapshotEdge) None - else indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None) - - val (degreeEdges, keyValues) = cacheElementOpt match { - case None => (Nil, kvs) - case Some(cacheElement) => - val head = cacheElement - if (!head.isDegree) (Nil, kvs) - else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail) - } - - val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor) - - if (!queryOption.ignorePrevStepCache) { - val edgeWithScores = for { - (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len - edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq - if where == WhereParser.success || where.filter(edge) - convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) - } yield { - val score = edge.rank(queryParam.rank) - EdgeWithScore(convertedEdge, score, label) - } - StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) - } else { - val degreeScore = 0.0 - - val edgeWithScores = for { - (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len - edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq - if where == WhereParser.success || where.filter(edge) - convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) - } yield { - val edgeScore = edge.rank(queryParam.rank) - val score = queryParam.scorePropagateOp match { - case "plus" => edgeScore + prevScore - case "divide" => - if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 - else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) - case _ => edgeScore * prevScore - } - val tsVal = processTimeDecay(queryParam, edge) - val newScore = degreeScore + score - EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label) - } - - val sampled = - if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) - else edgeWithScores - - val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled - - StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) - } - } - } - - /** End Of Parse Logic */ - - protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = { +object Storage { + def toRequestEdge(graph: S2Graph)(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = { val srcVertex = queryRequest.vertex val queryParam = queryRequest.queryParam val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt @@ -988,133 +56,131 @@ abstract class Storage[Q, R](val graph: S2Graph, } } - protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { - /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache - * so use empty cacheKey. - * */ - val queryParam = QueryParam(labelName = edge.innerLabel.label, - direction = GraphUtil.fromDirection(edge.labelWithDir.dir), - tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), - cacheTTLInMillis = -1) - val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) - // val q = Query.toQuery(Seq(edge.srcVertex), queryParam) +} - fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => - val (edgeOpt, kvOpt) = - if (kvs.isEmpty) (None, None) - else { - val snapshotEdgeOpt = toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) - val _kvOpt = kvs.headOption - (snapshotEdgeOpt, _kvOpt) - } - (queryParam, edgeOpt, kvOpt) - } recoverWith { case ex: Throwable => - logger.error(s"fetchQueryParam failed. fallback return.", ex) - throw new FetchTimeoutException(s"${edge.toLogString}") - } - } +abstract class Storage[Q](val graph: S2Graph, + val config: Config) { + /* Storage backend specific resource management */ + val management: StorageManagement + + /* Physically store given KeyValue into backend storage. */ + val mutator: StorageWritable + + /* + * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage + * then convert them into Edge/Vertex + */ + val fetcher: StorageReadable[Q] + + /* + * Serialize Edge/Vertex, to common KeyValue, SKeyValue that + * can be stored aligned to backend storage's physical schema. + * Also Deserialize storage backend's KeyValue to SKeyValue. + */ + val serDe: StorageSerDe + + /* + * Common helper to translate SKeyValue to Edge/Vertex and vice versa. + * Note that it require storage backend specific implementation for serialize/deserialize. + */ + lazy val io: StorageIO = new StorageIO(graph, serDe) + + /* + * Common helper to resolve write-write conflict on snapshot edge with same EdgeId. + * Note that it require storage backend specific implementations for + * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO + */ + lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher) + + + /** IO **/ + def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] = + serDe.snapshotEdgeSerializer(snapshotEdge) + + def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] = + serDe.indexEdgeSerializer(indexEdge) + + def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex] = + serDe.vertexSerializer(vertex) + def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] = + serDe.snapshotEdgeDeserializer(schemaVer) - /** end of query */ + def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable = + serDe.indexEdgeDeserializer(schemaVer) + + def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] = + serDe.vertexDeserializer(schemaVer) /** Mutation Builder */ + def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = + io.increments(edgeMutate) + def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = + io.indexedEdgeMutations(edgeMutate) - /** EdgeMutate */ - def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = { - // skip sampling for delete operation - val deleteMutations = edgeMutate.edgesToDeleteWithIndexOpt.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete, durability = indexEdge.label.durability)) - } + def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = + io.buildIncrementsAsync(indexedEdge, amount) - val insertMutations = edgeMutate.edgesToInsertWithIndexOpt.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) - } + def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = + io.buildIncrementsCountAsync(indexedEdge, amount) - deleteMutations ++ insertMutations - } + def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = + io.buildVertexPutsAsync(edge) def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = - edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil) + io.snapshotEdgeMutations(edgeMutate) - def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = { - (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match { - case (true, true) => - /* when there is no need to update. shouldUpdate == false */ - Nil -> Nil + def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = + io.buildDegreePuts(edge, degreeVal) - case (true, false) => - /* no edges to delete but there is new edges to insert so increase degree by 1 */ - val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree) - buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_)) + def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = + io.buildPutsAll(vertex) - case (false, true) => - /* no edges to insert but there is old edges to delete so decrease degree by 1 */ - val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree) - buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1)) + /** Mutation **/ - case (false, false) => - /* update on existing edges so no change on degree */ - Nil -> Nil - } - } + def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = + mutator.writeToStorage(cluster, kvs, withWait) - /** IndexEdge */ - def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.updatePropsWithTs() - newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts)) - val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) - indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) - } + def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = + mutator.writeLock(requestKeyValue, expectedOpt) - def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.updatePropsWithTs() - newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts)) - val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) - indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) - } + /** Fetch **/ - //TODO: ServiceColumn do not have durability property yet. - def buildDeleteBelongsToId(vertex: S2Vertex): Seq[SKeyValue] = { - val kvs = vertexSerializer(vertex).toKeyValues - val kv = kvs.head - vertex.belongLabelIds.map { id => - kv.copy(qualifier = Bytes.toBytes(S2Vertex.toPropKey(id)), operation = SKeyValue.Delete) - } - } + def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q = fetcher.buildRequest(queryRequest, edge) - def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = { - val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false) + def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q = fetcher.buildRequest(queryRequest, vertex) - if (storeVertex) { - if (edge.op == GraphUtil.operations("delete")) - buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex) - else - vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues - } else { - Seq.empty - } - } + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = + fetcher.fetches(queryRequests, prevStepEdges) - def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = { - edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts) - val kvs = edge.edgesWithIndexValid.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) - } + def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = fetcher.fetchKeyValues(rpc) - kvs - } + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll() - def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = { - vertex.op match { - case d: Byte if d == GraphUtil.operations("delete") => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) - case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - } - } + def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = fetcher.fetchVerticesAll() - def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName) + def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = + fetcher.fetchSnapshotEdgeInner(edge) + + /** Conflict Resolver **/ + def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = + conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt) + + /** Management **/ + def flush(): Unit = management.flush() + def createTable(config: Config, tableNameStr: String): Unit = management.createTable(config, tableNameStr) + + def truncateTable(config: Config, tableNameStr: String): Unit = management.truncateTable(config, tableNameStr) + + def deleteTable(config: Config, tableNameStr: String): Unit = management.deleteTable(config, tableNameStr) + + def shutdown(): Unit = management.shutdown() + + + def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName) }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala deleted file mode 100644 index 811cf62..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage - -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.QueryParam -import org.apache.s2graph.core.mysqls.{LabelMeta, Label} -import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs} -import org.apache.s2graph.core.utils.logger - -object StorageDeserializable { - /** Deserializer */ - def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { - val byte = bytes(offset) - val isInverted = if ((byte & 1) != 0) true else false - val labelOrderSeq = byte >> 1 - (labelOrderSeq.toByte, isInverted) - } - - def bytesToKeyValues(bytes: Array[Byte], - offset: Int, - length: Int, - schemaVer: String, - label: Label): (Array[(LabelMeta, InnerValLike)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(LabelMeta, InnerValLike)](len) - var i = 0 - while (i < len) { - val k = label.labelMetaMap(bytes(pos)) - pos += 1 - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - val ret = (kvs, pos) - // logger.debug(s"bytesToProps: $ret") - ret - } - - def bytesToKeyValuesWithTs(bytes: Array[Byte], - offset: Int, - schemaVer: String, - label: Label): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len) - var i = 0 - while (i < len) { - val k = label.labelMetaMap(bytes(pos)) - pos += 1 - val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - val ret = (kvs, pos) - // logger.debug(s"bytesToProps: $ret") - ret - } - - def bytesToProps(bytes: Array[Byte], - offset: Int, - schemaVer: String): (Array[(LabelMeta, InnerValLike)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(LabelMeta, InnerValLike)](len) - var i = 0 - while (i < len) { - val k = LabelMeta.empty - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - // logger.error(s"bytesToProps: $kvs") - val ret = (kvs, pos) - - ret - } - - def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset) - - def bytesToInt(bytes: Array[Byte], offset: Int): Int = Bytes.toInt(bytes, offset) -} - -trait StorageDeserializable[E] { - def fromKeyValues[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): Option[E] -// = { -// try { -// Option(fromKeyValuesInner(kvs, cacheElementOpt)) -// } catch { -// case e: Exception => -// logger.error(s"${this.getClass.getName} fromKeyValues failed.", e) -// None -// } -// } -// def fromKeyValuesInner[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): E -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala new file mode 100644 index 0000000..2e11f0b --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -0,0 +1,241 @@ +package org.apache.s2graph.core.storage + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.S2Graph.{convertEdges, normalize, processTimeDecay, sample} +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.parsers.WhereParser +import org.apache.s2graph.core.utils.logger + +class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { + val dummyCursor: Array[Byte] = Array.empty + + /** Parsing Logic: parse from kv from Storage into Edge */ + def toEdge[K: CanSKeyValue](kv: K, + queryRequest: QueryRequest, + cacheElementOpt: Option[S2Edge], + parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { + logger.debug(s"toEdge: $kv") + + try { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val schemaVer = queryParam.label.schemaVersion + val indexEdgeOpt = serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt) + if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges)) + else indexEdgeOpt + } catch { + case ex: Exception => + logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex) + None + } + } + + def toSnapshotEdge[K: CanSKeyValue](kv: K, + queryRequest: QueryRequest, + cacheElementOpt: Option[SnapshotEdge] = None, + isInnerCall: Boolean, + parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = { + // logger.debug(s"SnapshottoEdge: $kv") + val queryParam = queryRequest.queryParam + val schemaVer = queryParam.label.schemaVersion + val snapshotEdgeOpt = serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt) + + if (isInnerCall) { + snapshotEdgeOpt.flatMap { snapshotEdge => + val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) + if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) + else None + } + } else { + snapshotEdgeOpt.flatMap { snapshotEdge => + if (snapshotEdge.allPropsDeleted) None + else { + val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) + if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) + else None + } + } + } + } + + def toEdges[K: CanSKeyValue](kvs: Seq[K], + queryRequest: QueryRequest, + prevScore: Double = 1.0, + isInnerCall: Boolean, + parentEdges: Seq[EdgeWithScore], + startOffset: Int = 0, + len: Int = Int.MaxValue): StepResult = { + + val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _ + + if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor)) + else { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val labelWeight = queryRequest.labelWeight + val nextStepOpt = queryRequest.nextStepOpt + val where = queryParam.where.get + val label = queryParam.label + val isDefaultTransformer = queryParam.edgeTransformer.isDefault + val first = kvs.head + val kv = first + val schemaVer = queryParam.label.schemaVersion + val cacheElementOpt = + if (queryParam.isSnapshotEdge) None + else serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None) + + val (degreeEdges, keyValues) = cacheElementOpt match { + case None => (Nil, kvs) + case Some(cacheElement) => + val head = cacheElement + if (!head.isDegree) (Nil, kvs) + else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail) + } + + val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor) + + if (!queryOption.ignorePrevStepCache) { + val edgeWithScores = for { + (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len + edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq + if where == WhereParser.success || where.filter(edge) + convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) + } yield { + val score = edge.rank(queryParam.rank) + EdgeWithScore(convertedEdge, score, label) + } + StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) + } else { + val degreeScore = 0.0 + + val edgeWithScores = for { + (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len + edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq + if where == WhereParser.success || where.filter(edge) + convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) + } yield { + val edgeScore = edge.rank(queryParam.rank) + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } + val tsVal = processTimeDecay(queryParam, edge) + val newScore = degreeScore + score + EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label) + } + + val sampled = + if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + else edgeWithScores + + val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled + + StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) + } + } + } + + /** End Of Parse Logic */ + + + /** end of query */ + + /** Mutation Builder */ + + + /** EdgeMutate */ + def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = { + // skip sampling for delete operation + val deleteMutations = edgeMutate.edgesToDeleteWithIndexOpt.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete, durability = indexEdge.label.durability)) + } + + val insertMutations = edgeMutate.edgesToInsertWithIndexOpt.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) + } + + deleteMutations ++ insertMutations + } + + def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = + edgeMutate.newSnapshotEdge.map(e => serDe.snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil) + + def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = { + (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match { + case (true, true) => + /* when there is no need to update. shouldUpdate == false */ + Nil -> Nil + + case (true, false) => + /* no edges to delete but there is new edges to insert so increase degree by 1 */ + val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree) + buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_)) + + case (false, true) => + /* no edges to insert but there is old edges to delete so decrease degree by 1 */ + val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree) + buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1)) + + case (false, false) => + /* update on existing edges so no change on degree */ + Nil -> Nil + } + } + + /** IndexEdge */ + def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { + val newProps = indexedEdge.updatePropsWithTs() + newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts)) + val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) + serDe.indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) + } + + def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { + val newProps = indexedEdge.updatePropsWithTs() + newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts)) + val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) + serDe.indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) + } + + //TODO: ServiceColumn do not have durability property yet. + def buildDeleteBelongsToId(vertex: S2Vertex): Seq[SKeyValue] = { + val kvs = serDe.vertexSerializer(vertex).toKeyValues + val kv = kvs.head + vertex.belongLabelIds.map { id => + kv.copy(qualifier = Bytes.toBytes(S2Vertex.toPropKey(id)), operation = SKeyValue.Delete) + } + } + + def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = { + val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false) + + if (storeVertex) { + if (edge.op == GraphUtil.operations("delete")) + buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex) + else + serDe.vertexSerializer(edge.srcForVertex).toKeyValues ++ serDe.vertexSerializer(edge.tgtForVertex).toKeyValues + } else { + Seq.empty + } + } + + def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = { + edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts) + val kvs = edge.edgesWithIndexValid.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) + } + + kvs + } + + def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = { + vertex.op match { + case d: Byte if d == GraphUtil.operations("delete") => serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) + case _ => serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala new file mode 100644 index 0000000..da94767 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageManagement.scala @@ -0,0 +1,35 @@ +package org.apache.s2graph.core.storage + +import com.typesafe.config.Config + +trait StorageManagement { + /** + * this method need to be called when client shutdown. this is responsible to cleanUp the resources + * such as client into storage. + */ + def flush(): Unit + + /** + * create table on storage. + * if storage implementation does not support namespace or table, then there is nothing to be done + * @param config + */ + def createTable(config: Config, tableNameStr: String): Unit + /** + * + * @param config + * @param tableNameStr + */ + def truncateTable(config: Config, tableNameStr: String): Unit + /** + * + * @param config + * @param tableNameStr + */ + def deleteTable(config: Config, tableNameStr: String): Unit + + /** + * + */ + def shutdown(): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala new file mode 100644 index 0000000..96669ca --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -0,0 +1,62 @@ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException +import org.apache.s2graph.core._ +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +trait StorageReadable[Q] { + val io: StorageIO + /** + * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. + * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build + * client request(GetRequest, Scanner) based on user provided query. + * + * @param queryRequest + * @return + */ + def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q + + def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q + + /** + * responsible to fire parallel fetch call into storage and create future that will return merged result. + * + * @param queryRequests + * @param prevStepEdges + * @return + */ + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] + + def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] + + def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] + + def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { + val queryParam = QueryParam(labelName = edge.innerLabel.label, + direction = GraphUtil.fromDirection(edge.labelWithDir.dir), + tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), + cacheTTLInMillis = -1) + val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) + + fetchKeyValues(buildRequest(queryRequest, edge)).map { kvs => + val (edgeOpt, kvOpt) = + if (kvs.isEmpty) (None, None) + else { + val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) + val _kvOpt = kvs.headOption + (snapshotEdgeOpt, _kvOpt) + } + (queryParam, edgeOpt, kvOpt) + } recoverWith { case ex: Throwable => + logger.error(s"fetchQueryParam failed. fallback return.", ex) + throw new FetchTimeoutException(s"${edge.toLogString}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala new file mode 100644 index 0000000..f973e0f --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala @@ -0,0 +1,59 @@ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core.{IndexEdge, S2Graph, S2Vertex, SnapshotEdge} +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable + +trait StorageSerDe { + /** + * Compatibility table + * | label schema version | snapshot edge | index edge | vertex | note | + * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | + * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | + * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema | + * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | experimental schema. use scanner instead of get | + * + */ + + /** + * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue] + * so we can store this kvs. + * + * @param snapshotEdge : snapshotEdge to serialize + * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] + */ + def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] + + /** + * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue] + * + * @param indexEdge : indexEdge to serialize + * @return serializer implementation + */ + def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] + + /** + * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue] + * + * @param vertex : vertex to serialize + * @return serializer implementation + */ + def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex] + + /** + * create deserializer that can parse stored CanSKeyValue into snapshotEdge. + * note that each storage implementation should implement implicit type class + * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue + * + * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method. + * if any storaage use different class to represent stored byte array, + * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. + **/ + def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] + + def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable + + def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala deleted file mode 100644 index c1efe7b..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage - -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs} -import org.apache.s2graph.core.utils.logger - -object StorageSerializable { - /** serializer */ - def propsToBytes(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((_, v) <- props) bytes = Bytes.add(bytes, v.bytes) - bytes - } - - def propsToKeyValues(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes) - bytes - } - - def propsToKeyValuesWithTs(props: Seq[(LabelMeta, InnerValLikeWithTs)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes) - bytes - } - - def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { - assert(labelOrderSeq < (1 << 6)) - val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) - Array.fill(1)(byte.toByte) - } - - def intToBytes(value: Int): Array[Byte] = Bytes.toBytes(value) - - def longToBytes(value: Long): Array[Byte] = Bytes.toBytes(value) -} - -trait StorageSerializable[E] { - val cf = Serializable.edgeCf - - def table: Array[Byte] - def ts: Long - - def toRowKey: Array[Byte] - def toQualifier: Array[Byte] - def toValue: Array[Byte] - - def toKeyValues: Seq[SKeyValue] = { - val row = toRowKey - val qualifier = toQualifier - val value = toValue - val kv = SKeyValue(table, row, cf, qualifier, value, ts) -// logger.debug(s"[SER]: ${kv.toLogString}}") - Seq(kv) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala new file mode 100644 index 0000000..216aece --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala @@ -0,0 +1,45 @@ +package org.apache.s2graph.core.storage + +import scala.concurrent.{ExecutionContext, Future} + +trait StorageWritable { + /** + * decide how to store given key values Seq[SKeyValue] into storage using storage's client. + * note that this should be return true on all success. + * we assumes that each storage implementation has client as member variable. + * + * + * @param cluster: where this key values should be stored. + * @param kvs: sequence of SKeyValue that need to be stored in storage. + * @param withWait: flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @return ack message from storage. + */ + def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] + + /** + * write requestKeyValue into storage if the current value in storage that is stored matches. + * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. + * + * Most important thing is this have to be 'atomic' operation. + * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be + * either blocked or failed on write-write conflict case. + * + * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to + * prevent wrong data for read. + * + * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, + * compareAndSet to synchronize. + * + * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. + * for storage that does not support concurrency control, then storage implementation + * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) + * and write(writeLock). + * @param requestKeyValue + * @param expectedOpt + * @return + */ + def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] + +}
