http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala deleted file mode 100644 index 8789502..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ /dev/null @@ -1,1224 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.ExceptionHandler.{Key, Val, KafkaMessage} -import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.storage.serde._ -import com.kakao.s2graph.core.storage.serde.snapshotedge.tall -import com.kakao.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable -import com.kakao.s2graph.core.storage.serde.vertex._ -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.{Extensions, logger} -import com.typesafe.config.Config -import org.apache.hadoop.hbase.util.Bytes -import org.apache.kafka.clients.producer.ProducerRecord -import scala.annotation.tailrec -import scala.collection.Seq -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Random, Try} - -abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { - import HBaseType._ - - /** storage dependent configurations */ - val MaxRetryNum = config.getInt("max.retry.number") - val MaxBackOff = config.getInt("max.back.off") - val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") - val FailProb = config.getDouble("hbase.fail.prob") - val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000) - val maxSize = config.getInt("future.cache.max.size") - val expireAfterWrite = config.getInt("future.cache.expire.after.write") - val expireAfterAccess = config.getInt("future.cache.expire.after.access") - - /** - * Compatibility table - * | label schema version | snapshot edge | index edge | vertex | note | - * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | - * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | - * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema | - * | v4 | serde.snapshotedge.tall | serde.indexedge.tall | serde.vertex | experimental schema. use scanner instead of get | - * - */ - - /** - * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue] - * so we can store this kvs. - * @param snapshotEdge: snapshotEdge to serialize - * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] - */ - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): Serializable[SnapshotEdge] = { - snapshotEdge.schemaVer match { - case VERSION1 | VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge) - case VERSION3 | VERSION4 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) - case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}") - } - } - - /** - * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue] - * @param indexEdge: indexEdge to serialize - * @return serializer implementation - */ - def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = { - indexEdge.schemaVer match { - case VERSION1 | VERSION2 | VERSION3 => new indexedge.wide.IndexEdgeSerializable(indexEdge) - case VERSION4 => new indexedge.tall.IndexEdgeSerializable(indexEdge) - case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}") - - } - } - - /** - * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue] - * @param vertex: vertex to serialize - * @return serializer implementation - */ - def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex) - - /** - * create deserializer that can parse stored CanSKeyValue into snapshotEdge. - * note that each storage implementation should implement implicit type class - * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue - * - * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method. - * if any storaage use different class to represent stored byte array, - * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. - * */ - - val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = Map( - VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable, - VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable, - VERSION3 -> new tall.SnapshotEdgeDeserializable, - VERSION4 -> new tall.SnapshotEdgeDeserializable - ) - def snapshotEdgeDeserializer(schemaVer: String) = - snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) - - /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ - val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map( - VERSION1 -> new indexedge.wide.IndexEdgeDeserializable, - VERSION2 -> new indexedge.wide.IndexEdgeDeserializable, - VERSION3 -> new indexedge.wide.IndexEdgeDeserializable, - VERSION4 -> new indexedge.tall.IndexEdgeDeserializable - ) - - def indexEdgeDeserializer(schemaVer: String) = - indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) - - /** create deserializer that can parser stored CanSKeyValue into vertex. */ - val vertexDeserializer = new VertexDeserializable - - - /** - * decide how to store given key values Seq[SKeyValue] into storage using storage's client. - * note that this should be return true on all success. - * we assumes that each storage implementation has client as member variable. - * - * - * @param cluster: where this key values should be stored. - * @param kvs: sequence of SKeyValue that need to be stored in storage. - * @param withWait: flag to control wait ack from storage. - * note that in AsynchbaseStorage(which support asynchronous operations), even with true, - * it never block thread, but rather submit work and notified by event loop when storage send ack back. - * @return ack message from storage. - */ - def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] - -// def writeToStorage(kv: SKeyValue, withWait: Boolean): Future[Boolean] - - /** - * fetch SnapshotEdge for given request from storage. - * also storage datatype should be converted into SKeyValue. - * note that return type is Sequence rather than single SKeyValue for simplicity, - * even though there is assertions sequence.length == 1. - * @param request - * @return - */ - def fetchSnapshotEdgeKeyValues(request: AnyRef): Future[Seq[SKeyValue]] - - /** - * write requestKeyValue into storage if the current value in storage that is stored matches. - * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. - * - * Most important thing is this have to be 'atomic' operation. - * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be - * either blocked or failed on write-write conflict case. - * - * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to - * prevent wrong data for read. - * - * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, - * compareAndSet to synchronize. - * - * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. - * for storage that does not support concurrency control, then storage implementation - * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) - * and write(writeLock). - * @param requestKeyValue - * @param expectedOpt - * @return - */ - def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] - - /** - * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. - * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build - * client request(GetRequest, Scanner) based on user provided query. - * @param queryRequest - * @return - */ - def buildRequest(queryRequest: QueryRequest): AnyRef - - /** - * fetch IndexEdges for given queryParam in queryRequest. - * this expect previous step starting score to propagate score into next step. - * also parentEdges is necessary to return full bfs tree when query require it. - * - * note that return type is general type. - * for example, currently we wanted to use Asynchbase - * so single I/O return type should be Deferred[T]. - * - * if we use native hbase client, then this return type can be Future[T] or just T. - * @param queryRequest - * @param prevStepScore - * @param isInnerCall - * @param parentEdges - * @return - */ - def fetch(queryRequest: QueryRequest, - prevStepScore: Double, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): R - - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * @param queryRequestWithScoreLs - * @param prevStepEdges - * @return - */ - def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] - - /** - * fetch Vertex for given request from storage. - * @param request - * @return - */ - def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] - - /** - * decide how to apply given edges(indexProps values + Map(_count -> countVal)) into storage. - * @param edges - * @param withWait - * @return - */ - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] - - /** - * this method need to be called when client shutdown. this is responsible to cleanUp the resources - * such as client into storage. - */ - def flush(): Unit - - /** - * create table on storage. - * if storage implementation does not support namespace or table, then there is nothing to be done - * @param zkAddr - * @param tableName - * @param cfs - * @param regionMultiplier - * @param ttl - * @param compressionAlgorithm - */ - def createTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String): Unit - - - - - - /** Public Interface */ - - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { - def fromResult(queryParam: QueryParam, - kvs: Seq[SKeyValue], - version: String): Option[Vertex] = { - if (kvs.isEmpty) None - else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None) - } - - val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), queryParam) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs => - fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion) - } recoverWith { case ex: Throwable => - Future.successful(None) - } - } - - Future.sequence(futures).map { result => result.toList.flatten } - } - - def mutateElements(elements: Seq[GraphElement], - withWait: Boolean = false): Future[Seq[Boolean]] = { - - val edgeBuffer = ArrayBuffer[Edge]() - val vertexBuffer = ArrayBuffer[Vertex]() - - elements.foreach { - case e: Edge => edgeBuffer += e - case v: Vertex => vertexBuffer += v - case any@_ => logger.error(s"Unknown type: ${any}") - } - - val edgeFuture = mutateEdges(edgeBuffer, withWait) - val vertexFuture = mutateVertices(vertexBuffer, withWait) - - val graphFuture = for { - edgesMutated <- edgeFuture - verticesMutated <- vertexFuture - } yield edgesMutated ++ verticesMutated - - graphFuture - } - - def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { - val (strongEdges, weakEdges) = - (edges.partition(e => e.label.consistencyLevel == "strong" || e.op == GraphUtil.operations("insertBulk"))) - - val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) => - val mutations = edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge) - else Edge.buildOperation(None, Seq(edge)) - buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++ - snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) - } - writeToStorage(zkQuorum, mutations, withWait) - } - val strongEdgesFutures = mutateStrongEdges(strongEdges, withWait) - for { - weak <- Future.sequence(weakEdgesFutures) - strong <- strongEdgesFutures - } yield { - strong ++ weak - } - } - def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { - - val grouped = _edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val (deleteAllEdges, edges) = edgeGroup.partition(_.op == GraphUtil.operations("deleteAll")) - - // DeleteAll first - val deleteAllFutures = deleteAllEdges.map { edge => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) - } - - // After deleteAll, process others - lazy val mutateEdgeFutures = edges.toList match { - case head :: tail => - // val strongConsistency = edges.head.label.consistencyLevel == "strong" - // if (strongConsistency) { - val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait)(Edge.buildOperation) - - //TODO: decide what we will do on failure on vertex put - val puts = buildVertexPutsAsync(head) - val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - // } else { - // edges.map { edge => mutateEdge(edge, withWait = withWait) } - // } - case Nil => Nil - } - - val composed = for { - deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield deleteRet ++ mutateRet - - composed.map(_.forall(identity)) - } - - Future.sequence(mutateEdges) - } - - def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = { - if (vertex.op == GraphUtil.operations("delete")) { - writeToStorage(vertex.hbaseZkAddr, - vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(true) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - writeToStorage(vertex.hbaseZkAddr, buildPutsAll(vertex), withWait) - } - } - - def mutateVertices(vertices: Seq[Vertex], - withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = vertices.map { vertex => mutateVertex(vertex, withWait) } - Future.sequence(futures) - } - - - def mutateEdgesInner(edges: Seq[Edge], - checkConsistency: Boolean, - withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => (Edge, EdgeMutate)): Future[Boolean] = { - if (!checkConsistency) { - val zkQuorum = edges.head.label.hbaseZkAddr - val futures = edges.map { edge => - val (_, edgeUpdate) = f(None, Seq(edge)) - val mutations = - indexedEdgeMutations(edgeUpdate) ++ - snapshotEdgeMutations(edgeUpdate) ++ - increments(edgeUpdate) - writeToStorage(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => rets.forall(identity) } - } else { - def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = { - - fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => - - val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges) - logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}") - //shouldReplace false. - if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) { - logger.debug(s"${newEdge.toLogString} drop.") - Future.successful(true) - } else { - commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, edgeUpdate).map { ret => - if (ret) { - logger.info(s"[Success] commit: \n${_edges.map(_.toLogString).mkString("\n")}") - } else { - throw new PartialFailureException(newEdge, 3, "commit failed.") - } - true - } - } - } - } - def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: (Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = { - if (tryNum >= MaxRetryNum) { - edges.foreach { edge => - logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") - ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = edge)) - } - Future.successful(false) - } else { - val future = fn(edges, statusCode) - future.onSuccess { - case success => - logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") - } - future recoverWith { - case FetchTimeoutException(retryEdge) => - logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") - retry(tryNum + 1)(edges, statusCode)(fn) - - case PartialFailureException(retryEdge, failedStatusCode, faileReason) => - val status = failedStatusCode match { - case 0 => "AcquireLock failed." - case 1 => "Mutation failed." - case 2 => "Increment failed." - case 3 => "ReleaseLock failed." - case 4 => "Unknown" - } - - Thread.sleep(Random.nextInt(MaxBackOff)) - logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") - retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn) - case ex: Exception => - logger.error("Unknown exception", ex) - Future.successful(false) - } - } - } - retry(1)(edges, 0)(commit) - } - } - - def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge], - newEdge: Edge, edgeMutate: EdgeMutate) = - Seq("----------------------------------------------", - s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", - s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", - s"newEdge: ${newEdge.toLogString}", - s"mutation: \n${edgeMutate.toLogString}", - "----------------------------------------------").mkString("\n") - - - /** Delete All */ - protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest, - queryResult: QueryResult, - requestTs: Long, - retryNum: Int): Future[Boolean] = { - val queryParam = queryRequest.queryParam - val zkQuorum = queryParam.label.hbaseZkAddr - val futures = for { - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } yield { - /** reverted direction */ - val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - buildIncrementsAsync(indexEdge, -1L) - } - val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - buildIncrementsAsync(indexEdge, -1L) - } - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - writeToStorage(zkQuorum, mutations, withWait = true) - } - - Future.sequence(futures).map { rets => rets.forall(identity) } - } - - protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get - val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore => - (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - }.map { edgeWithScore => - val label = queryRequest.queryParam.label - val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { - case "strong" => - val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) - (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) - case _ => - val oldEdge = edgeWithScore.edge - (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) - } - - val copiedEdge = - edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) - - val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) -// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") - edgeToDelete - } - - queryResult.copy(edgeWithScoreLs = edgeWithScoreLs) - } - - protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = { - val queryResultLs = queryRequestWithResultLs.map(_.queryResult) - queryResultLs.foreach { queryResult => - if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.") - } - val futures = for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs) - if deleteQueryResult.edgeWithScoreLs.nonEmpty - } yield { - val label = queryRequest.queryParam.label - label.schemaVersion match { - case HBaseType.VERSION3 | HBaseType.VERSION4 => - if (label.consistencyLevel == "strong") { - /** - * read: snapshotEdge on queryResult = O(N) - * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) - */ - mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) - } else { - deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) - } - case _ => - - /** - * read: x - * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) - */ - deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) - } - } - - if (futures.isEmpty) { - // all deleted. - Future.successful(true -> true) - } else { - Future.sequence(futures).map { rets => false -> rets.forall(identity) } - } - } - - protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = { - val future = for { - queryRequestWithResultLs <- getEdges(query) - (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs) - } yield { -// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") - (allDeleted, ret) - } - - Extensions.retryOnFailure(MaxRetryNum) { - future - } { - logger.error(s"fetch and deleteAll failed.") - (true, false) - } - - } - - def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], - labels: Seq[Label], - dir: Int, - ts: Long): Future[Boolean] = { - - def enqueueLogMessage() = { - val kafkaMessages = for { - vertice <- srcVertices - id = vertice.innerId.toIdString() - label <- labels - } yield { - val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") - val topic = ExceptionHandler.failTopic - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg - } - - ExceptionHandler.enqueues(kafkaMessages) - } - - val requestTs = ts - val queryParams = for { - label <- labels - } yield { - val labelWithDir = LabelWithDirection(label.id.get, dir) - QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw)) - } - - val step = Step(queryParams.toList) - val q = Query(srcVertices, Vector(step)) - - // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { - val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) { - fetchAndDeleteAll(q, requestTs) - } { case (allDeleted, deleteSuccess) => - allDeleted && deleteSuccess - }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } - - retryFuture onFailure { - case ex => - logger.error(s"[Error]: deleteAllAdjacentEdges failed.") - enqueueLogMessage() - } - - retryFuture - } - - /** End Of Delete All */ - - - - - /** Parsing Logic: parse from kv from Storage into Edge */ - def toEdge[K: CanSKeyValue](kv: K, - queryParam: QueryParam, - cacheElementOpt: Option[IndexEdge], - parentEdges: Seq[EdgeWithScore]): Option[Edge] = { -// logger.debug(s"toEdge: $kv") - try { - val schemaVer = queryParam.label.schemaVersion - val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) - indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges)) - } catch { - case ex: Exception => - logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}", ex) - None - } - } - - def toSnapshotEdge[K: CanSKeyValue](kv: K, - queryParam: QueryParam, - cacheElementOpt: Option[SnapshotEdge] = None, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Option[Edge] = { -// logger.debug(s"SnapshottoEdge: $kv") - val schemaVer = queryParam.label.schemaVersion - val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) - - if (isInnerCall) { - snapshotEdgeOpt.flatMap { snapshotEdge => - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) - if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) - else None - } - } else { - snapshotEdgeOpt.flatMap { snapshotEdge => - if (Edge.allPropsDeleted(snapshotEdge.props)) None - else { - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) - if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) - else None - } - } - } - } - - def toEdges[K: CanSKeyValue](kvs: Seq[K], - queryParam: QueryParam, - prevScore: Double = 1.0, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { - if (kvs.isEmpty) Seq.empty - else { - val first = kvs.head - val kv = first - val schemaVer = queryParam.label.schemaVersion - val cacheElementOpt = - if (queryParam.isSnapshotEdge) None - else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None) - - for { - kv <- kvs - edge <- - if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges) - else toEdge(kv, queryParam, cacheElementOpt, parentEdges) - } yield { - //TODO: Refactor this. - val currentScore = - queryParam.scorePropagateOp match { - case "plus" => edge.rank(queryParam.rank) + prevScore - case _ => edge.rank(queryParam.rank) * prevScore - } - EdgeWithScore(edge, currentScore) - } - } - } - - /** End Of Parse Logic */ - -// /** methods for consistency */ -// protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { -// if (elementRpcs.isEmpty) { -// Future.successful(true) -// } else { -// val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) } -// Future.sequence(futures).map(_.forall(identity)) -// } -// } - - case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends Exception - - protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") - logger.debug(msg) - } - - protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", - s"${edgeMutate.toLogString}").mkString("\n") - logger.debug(msg) - } - - protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: Option[SKeyValue]) = { - val currentTs = System.currentTimeMillis() - val lockTs = snapshotEdgeOpt match { - case None => Option(currentTs) - case Some(snapshotEdge) => - snapshotEdge.pendingEdgeOpt match { - case None => Option(currentTs) - case Some(pendingEdge) => pendingEdge.lockTs - } - } - val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1 - // snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1 - val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = lockTs) - val base = snapshotEdgeOpt match { - case None => - // no one ever mutated on this snapshotEdge. - edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - case Some(snapshotEdge) => - // there is at least one mutation have been succeed. - snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - } - base.copy(version = newVersion, statusCode = 1, lockTs = None) - } - - protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: SnapshotEdge, - edgeMutate: EdgeMutate) = { - val newVersion = lockEdge.version + 1 - val base = edgeMutate.newSnapshotEdge match { - case None => - // shouldReplace false - assert(snapshotEdgeOpt.isDefined) - snapshotEdgeOpt.get.toSnapshotEdge - case Some(newSnapshotEdge) => newSnapshotEdge - } - base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None) - } - - protected def acquireLock(statusCode: Byte, - edge: Edge, - oldSnapshotEdgeOpt: Option[Edge], - lockEdge: SnapshotEdge, - oldBytes: Array[Byte]): Future[Boolean] = { - if (statusCode >= 1) { - logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p") - else { - val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head - val oldPut = oldSnapshotEdgeOpt.map(e => snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head) -// val lockEdgePut = buildPutAsync(lockEdge).head -// val oldPut = oldSnapshotEdgeOpt.map(e => buildPutAsync(e.toSnapshotEdge).head) - writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception => - logger.error(s"AcquireLock RPC Failed.") - throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed") - }.map { ret => - if (ret) { - val log = Seq( - "\n", - "=" * 50, - s"[Success]: acquireLock", - s"[RequestEdge]: ${edge.toLogString}", - s"[LockEdge]: ${lockEdge.toLogString()}", - s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", - "=" * 50, "\n").mkString("\n") - - logger.debug(log) - // debug(ret, "acquireLock", edge.toSnapshotEdge) - } else { - throw new PartialFailureException(edge, 0, "hbase fail.") - } - true - } - } - } - } - - - - protected def releaseLock(predicate: Boolean, - edge: Edge, - lockEdge: SnapshotEdge, - releaseLockEdge: SnapshotEdge, - _edgeMutate: EdgeMutate, - oldBytes: Array[Byte]): Future[Boolean] = { - if (!predicate) { - throw new PartialFailureException(edge, 3, "predicate failed.") - } - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p") - else { - val releaseLockEdgePut = snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head - val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head - writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith { - case ex: Exception => - logger.error(s"ReleaseLock RPC Failed.") - throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed") - }.map { ret => - if (ret) { - debug(ret, "releaseLock", edge.toSnapshotEdge) - } else { - val msg = Seq("\nFATAL ERROR\n", - "=" * 50, - oldBytes.toList, - lockEdgePut, - releaseLockEdgePut, - // lockEdgePut.value.toList, - // releaseLockEdgePut.value().toList, - "=" * 50, - "\n" - ) - logger.error(msg.mkString("\n")) - // error(ret, "releaseLock", edge.toSnapshotEdge) - throw new PartialFailureException(edge, 3, "hbase fail.") - } - true - } - } - Future.successful(true) - } - - - protected def mutate(predicate: Boolean, - edge: Edge, - statusCode: Byte, - _edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) throw new PartialFailureException(edge, 1, "predicate failed.") - - if (statusCode >= 2) { - logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p") - else - writeToStorage(edge.label.hbaseZkAddr, indexedEdgeMutations(_edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate) - } else { - throw new PartialFailureException(edge, 1, "hbase fail.") - } - true - } - } - } - - protected def increment(predicate: Boolean, - edge: Edge, - statusCode: Byte, _edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) throw new PartialFailureException(edge, 2, "predicate failed.") - if (statusCode >= 3) { - logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p") - else - writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate) - } else { - throw new PartialFailureException(edge, 2, "hbase fail.") - } - true - } - } - } - - - /** this may be overrided by specific storage implementation */ - protected def commitProcess(edge: Edge, statusCode: Byte) - (snapshotEdgeOpt: Option[Edge], kvOpt: Option[SKeyValue]) - (lockEdge: SnapshotEdge, releaseLockEdge: SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = { - val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte]) - for { - locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge, oldBytes) - mutated <- mutate(locked, edge, statusCode, _edgeMutate) - incremented <- increment(mutated, edge, statusCode, _edgeMutate) - released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, _edgeMutate, oldBytes) - } yield { - released - } - } - - protected def commitUpdate(edge: Edge, - statusCode: Byte)(snapshotEdgeOpt: Option[Edge], - kvOpt: Option[SKeyValue], - edgeUpdate: EdgeMutate): Future[Boolean] = { - val label = edge.label - def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty) - - val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt) - val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, edgeUpdate) - val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_ - snapshotEdgeOpt match { - case None => - // no one ever did success on acquire lock. - _process(lockEdge, releaseLockEdge, edgeUpdate) - // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) - case Some(snapshotEdge) => - // someone did success on acquire lock at least one. - snapshotEdge.pendingEdgeOpt match { - case None => - // not locked - _process(lockEdge, releaseLockEdge, edgeUpdate) - // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) - case Some(pendingEdge) => - def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() - if (isLockExpired) { - val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) - val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(pendingEdge)) - val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, kvOpt) - val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, newLockEdge, newEdgeUpdate) - commitProcess(edge, statusCode = 0)(snapshotEdgeOpt, kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret => - // process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode = 0).flatMap { ret => - val log = s"[Success]: Resolving expired pending edge.\n${pendingEdge.toLogString}" - throw new PartialFailureException(edge, 0, log) - } - } else { - // locked - if (pendingEdge.ts == edge.ts && statusCode > 0) { - // self locked - val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) - val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(edge)) - val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, newEdgeUpdate) - - /** lockEdge will be ignored */ - _process(lockEdge, newReleaseLockEdge, newEdgeUpdate) - // process(lockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode) - } else { - throw new PartialFailureException(edge, statusCode, s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]") - } - } - } - } - } - - /** end of methods for consistency */ - - - // def futureCache[T] = Cache[Long, (Long, T)] - - protected def toRequestEdge(queryRequest: QueryRequest): Edge = { - val srcVertex = queryRequest.vertex - // val tgtVertexOpt = queryRequest.tgtVertexOpt - val edgeCf = Serializable.edgeCf - val queryParam = queryRequest.queryParam - val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt - val label = queryParam.label - val labelWithDir = queryParam.labelWithDir - val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) - val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match { - case Some(tgtVertexId) => // _to is given. - /** we use toSnapshotEdge so dont need to swap src, tgt */ - val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion) - (src, tgt) - case None => - val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - (src, src) - } - - val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) - val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) - val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)).toMap - Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs) - } - - - - protected def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { - val labelWithDir = edge.labelWithDir - val queryParam = QueryParam(labelWithDir) - val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) - val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) - - fetchSnapshotEdgeKeyValues(buildRequest(queryRequest)).map { kvs => - val (edgeOpt, kvOpt) = - if (kvs.isEmpty) (None, None) - else { - val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, parentEdges = Nil).headOption.map(_.edge) - val _kvOpt = kvs.headOption - (_edgeOpt, _kvOpt) - } - (queryParam, edgeOpt, kvOpt) - } recoverWith { case ex: Throwable => - logger.error(s"fetchQueryParam failed. fallback return.", ex) - throw new FetchTimeoutException(s"${edge.toLogString}") - } - } - - protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = { - if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil) - else { - val queryRequest = queryRequestWithResultsLs.head.queryRequest - val q = orgQuery - val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult) - - val stepIdx = queryRequest.stepIdx + 1 - - val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None - val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) - val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) - val step = q.steps(stepIdx) - val alreadyVisited = - if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] - else Graph.alreadyVisitedVertices(queryResultsLs) - - val groupedBy = queryResultsLs.flatMap { queryResult => - queryResult.edgeWithScoreLs.map { case edgeWithScore => - edgeWithScore.edge.tgtVertex -> edgeWithScore - } - }.groupBy { case (vertex, edgeWithScore) => vertex } - - val groupedByFiltered = for { - (vertex, edgesWithScore) <- groupedBy - aggregatedScore = edgesWithScore.map(_._2.score).sum if aggregatedScore >= prevStepThreshold - } yield vertex -> aggregatedScore - - val prevStepTgtVertexIdEdges = for { - (vertex, edgesWithScore) <- groupedBy - } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) => edgeWithScore } - - val nextStepSrcVertices = if (prevStepLimit >= 0) { - groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) - } else { - groupedByFiltered.toSeq - } - - val queryRequests = for { - (vertex, prevStepScore) <- nextStepSrcVertices - queryParam <- step.queryParams - } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore) - - Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec) - } - } - - protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = { - for { - queryRequestWithResultLs <- queryRequestWithResultLsFuture - ret <- fetchStep(orgQuery, queryRequestWithResultLs) - } yield ret - } - - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = { - val fallback = { - val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty) - Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult()))) - } - Try { - - if (q.steps.isEmpty) { - // TODO: this should be get vertex query. - fallback - } else { - // current stepIdx = -1 - val startQueryResultLs = QueryResult.fromVertices(q) - q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) => - fetchStepFuture(q, acc) -// fetchStepFuture(q, acc).map { stepResults => -// step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult) => -// val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor) -// qParam.cursorOpt = Option(cursor) -// } -// stepResults -// } - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = { - val ts = System.currentTimeMillis() - val futures = for { - (srcVertex, tgtVertex, queryParam) <- params - propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, queryParam.label.schemaVersion)) - edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs) - } yield { - fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => - val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) - val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) - val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))) - QueryRequestWithResult(queryRequest, queryResult) - } - } - - Future.sequence(futures) - } - - - - @tailrec - final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (range < sampleNumber || set.size == sampleNumber) set - else randomInt(sampleNumber, range, set + Random.nextInt(range)) - } - - protected def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n){ - edges - }else{ - val plainEdges = if (queryRequest.queryParam.offset == 0) { - edges.tail - } else edges - - val randoms = randomInt(n, plainEdges.size) - var samples = List.empty[EdgeWithScore] - var idx = 0 - plainEdges.foreach { e => - if (randoms.contains(idx)) samples = e :: samples - idx += 1 - } - samples.toSeq - } - - } - /** end of query */ - - /** Mutation Builder */ - - - /** EdgeMutate */ - def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = { - val deleteMutations = edgeMutate.edgesToDelete.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) - } - val insertMutations = edgeMutate.edgesToInsert.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - } - - deleteMutations ++ insertMutations - } - - def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = - edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues).getOrElse(Nil) - - def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = - (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match { - case (true, true) => - - /** when there is no need to update. shouldUpdate == false */ - List.empty - case (true, false) => - - /** no edges to delete but there is new edges to insert so increase degree by 1 */ - edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) } - case (false, true) => - - /** no edges to insert but there is old edges to delete so decrease degree by 1 */ - edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) } - case (false, false) => - - /** update on existing edges so no change on degree */ - List.empty - } - - /** IndexEdge */ - def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) - val _indexedEdge = indexedEdge.copy(props = newProps) - indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) - } - - def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) - val _indexedEdge = indexedEdge.copy(props = newProps) - indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) - } - def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = { - val kvs = vertexSerializer(vertex).toKeyValues - val kv = kvs.head - vertex.belongLabelIds.map { id => - kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id)), operation = SKeyValue.Delete) - } - } - - def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = - if (edge.op == GraphUtil.operations("delete")) - buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex) - else - vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues - - def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = { - vertex.op match { - case d: Byte if d == GraphUtil.operations("delete") => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) - case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala deleted file mode 100644 index 4b3300a..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala +++ /dev/null @@ -1,95 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.QueryParam -import com.kakao.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs} -import org.apache.hadoop.hbase.util.Bytes - -object StorageDeserializable { - /** Deserializer */ - def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { - val byte = bytes(offset) - val isInverted = if ((byte & 1) != 0) true else false - val labelOrderSeq = byte >> 1 - (labelOrderSeq.toByte, isInverted) - } - - def bytesToKeyValues(bytes: Array[Byte], - offset: Int, - length: Int, - version: String): (Array[(Byte, InnerValLike)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(Byte, InnerValLike)](len) - var i = 0 - while (i < len) { - val k = bytes(pos) - pos += 1 - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - val ret = (kvs, pos) - // logger.debug(s"bytesToProps: $ret") - ret - } - - def bytesToKeyValuesWithTs(bytes: Array[Byte], - offset: Int, - version: String): (Array[(Byte, InnerValLikeWithTs)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(Byte, InnerValLikeWithTs)](len) - var i = 0 - while (i < len) { - val k = bytes(pos) - pos += 1 - val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, version) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - val ret = (kvs, pos) - // logger.debug(s"bytesToProps: $ret") - ret - } - - def bytesToProps(bytes: Array[Byte], - offset: Int, - version: String): (Array[(Byte, InnerValLike)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(Byte, InnerValLike)](len) - var i = 0 - while (i < len) { - val k = HBaseType.EMPTY_SEQ_BYTE - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - // logger.error(s"bytesToProps: $kvs") - val ret = (kvs, pos) - - ret - } - - def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset) -} - -trait StorageDeserializable[E] { - def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = { - try { - Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt)) - } catch { - case e: Exception => - logger.error(s"${this.getClass.getName} fromKeyValues failed.", e) - None - } - } - def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala deleted file mode 100644 index 575f4ab..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageSerializable.scala +++ /dev/null @@ -1,41 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.types.{InnerValLikeWithTs, InnerValLike} -import org.apache.hadoop.hbase.util.Bytes - -object StorageSerializable { - /** serializer */ - def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes) - bytes - } - - def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) - bytes - } - - def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) - bytes - } - - def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { - assert(labelOrderSeq < (1 << 6)) - val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) - Array.fill(1)(byte.toByte) - } -} - -trait StorageSerializable[E] { - def toKeyValues: Seq[SKeyValue] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala deleted file mode 100644 index 8441c6b..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ /dev/null @@ -1,533 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.storage._ -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.{FutureCache, DeferCache, Extensions, logger} -import com.stumbleupon.async.Deferred -import com.typesafe.config.{ConfigFactory, Config} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} -import org.apache.hadoop.security.UserGroupInformation -import org.hbase.async._ -import scala.collection.JavaConversions._ -import scala.collection.{Map, Seq} -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future, duration} -import scala.util.hashing.MurmurHash3 -import java.util -import java.util.Base64 - - -object AsynchbaseStorage { - val vertexCf = Serializable.vertexCf - val edgeCf = Serializable.edgeCf - val emptyKVs = new util.ArrayList[KeyValue]() - - - def makeClient(config: Config, overrideKv: (String, String)*) = { - val asyncConfig: org.hbase.async.Config = - if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { - val krb5Conf = config.getString("java.security.krb5.conf") - val jaas = config.getString("java.security.auth.login.config") - - System.setProperty("java.security.krb5.conf", krb5Conf) - System.setProperty("java.security.auth.login.config", jaas) - new org.hbase.async.Config() - } else { - new org.hbase.async.Config() - } - - for (entry <- config.entrySet() if entry.getKey.contains("hbase")) { - asyncConfig.overrideConfig(entry.getKey, entry.getValue.unwrapped().toString) - } - - for ((k, v) <- overrideKv) { - asyncConfig.overrideConfig(k, v) - } - - val client = new HBaseClient(asyncConfig) - logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}") - client - } -} - - -class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext) - extends Storage[Deferred[QueryRequestWithResult]](config) { - - import Extensions.DeferOps - - /** - * Asynchbase client setup. - * note that we need two client, one for bulk(withWait=false) and another for withWait=true - */ - val configWithFlush = config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval" -> "0"))) - val client = AsynchbaseStorage.makeClient(config) - - private val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") - private val clients = Seq(client, clientWithFlush) - private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort - private val emptyKeyValues = new util.ArrayList[KeyValue]() - private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client - - /** Future Cache to squash request */ - private val futureCache = new DeferCache[QueryResult](config)(ec) - - /** Simple Vertex Cache */ - private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec) - - - /** - * fire rpcs into proper hbase cluster using client and - * return true on all mutation success. otherwise return false. - */ - override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { - if (kvs.isEmpty) Future.successful(true) - else { - val _client = client(withWait) - val futures = kvs.map { kv => - val _defer = kv.operation match { - case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp)) - case SKeyValue.Delete => - if (kv.qualifier == null) _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp)) - else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.timestamp)) - case SKeyValue.Increment => - _client.atomicIncrement(new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))) - } - val future = _defer.withCallback { ret => true }.recoverWith { ex => - logger.error(s"mutation failed. $kv", ex) - false - }.toFuture - - if (withWait) future else Future.successful(true) - } - - Future.sequence(futures).map(_.forall(identity)) - } - } - - - override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): Future[Seq[SKeyValue]] = { - val defer = fetchKeyValuesInner(hbaseRpc) - defer.toFuture.map { kvsArr => - kvsArr.map { kv => - implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - } toSeq - } - } - - /** - * since HBase natively provide CheckAndSet on storage level, implementation becomes simple. - * @param rpc: key value that is need to be stored on storage. - * @param expectedOpt: last valid value for rpc's KeyValue.value from fetching. - * @return return true if expected value matches and our rpc is successfully applied, otherwise false. - * note that when some other thread modified same cell and have different value on this KeyValue, - * then HBase atomically return false. - */ - override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = { - val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) - val expected = expectedOpt.map(_.value).getOrElse(Array.empty) - client(withWait = true).compareAndSet(put, expected).withCallback(ret => ret.booleanValue()).toFuture - } - - - /** - * given queryRequest, build storage specific RPC Request. - * In HBase case, we either build Scanner or GetRequest. - * - * IndexEdge layer: - * Tall schema(v4): use scanner. - * Wide schema(label's schema version in v1, v2, v3): use GetRequest with columnRangeFilter - * when query is given with itnerval option. - * SnapshotEdge layer: - * Tall schema(v3, v4): use GetRequest without column filter. - * Wide schema(label's schema version in v1, v2): use GetRequest with columnRangeFilter. - * Vertex layer: - * all version: use GetRequest without column filter. - * @param queryRequest - * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter. - */ - override def buildRequest(queryRequest: QueryRequest): AnyRef = { - import Serializable._ - val queryParam = queryRequest.queryParam - val label = queryParam.label - val edge = toRequestEdge(queryRequest) - - val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) { - val snapshotEdge = edge.toSnapshotEdge - snapshotEdgeSerializer(snapshotEdge).toKeyValues.head - // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - } else { - val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq) - assert(indexedEdgeOpt.isDefined) - - val indexedEdge = indexedEdgeOpt.get - indexEdgeSerializer(indexedEdge).toKeyValues.head - } - - val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) - - label.schemaVersion match { - case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => - val scanner = client.newScanner(label.hbaseTableName.getBytes) - scanner.setFamily(edgeCf) - - /** - * TODO: remove this part. - */ - val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption - val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) - - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - // val labelIndexSeqWithIsInvertedStopBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = true) - val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op))) - val (startKey, stopKey) = - if (queryParam.columnRangeFilter != null) { - // interval is set. - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0)) - case None => Bytes.add(baseKey, queryParam.columnRangeFilterMinBytes) - } - (_startKey, Bytes.add(baseKey, queryParam.columnRangeFilterMaxBytes)) - } else { - /** - * note: since propsToBytes encode size of property map at first byte, we are sure about max value here - */ - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0)) - case None => baseKey - } - (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) - } -// logger.debug(s"[StartKey]: ${startKey.toList}") -// logger.debug(s"[StopKey]: ${stopKey.toList}") - - scanner.setStartKey(startKey) - scanner.setStopKey(stopKey) - - if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") - - scanner.setMaxVersions(1) - scanner.setMaxNumRows(queryParam.limit) - scanner.setMaxTimestamp(maxTs) - scanner.setMinTimestamp(minTs) - scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis) - // SET option for this rpc properly. - scanner - case _ => - val get = - if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf) - - get.maxVersions(1) - get.setFailfast(true) - get.setMaxResultsPerColumnFamily(queryParam.limit) - get.setRowOffsetPerColumnFamily(queryParam.offset) - get.setMinTimestamp(minTs) - get.setMaxTimestamp(maxTs) - get.setTimeout(queryParam.rpcTimeoutInMillis) - - if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter) - - get - } - } - - /** - * we are using future cache to squash requests into same key on storage. - * - * @param queryRequest - * @param prevStepScore - * @param isInnerCall - * @param parentEdges - * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. - * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback - */ - override def fetch(queryRequest: QueryRequest, - prevStepScore: Double, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = { - - def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = { - fetchKeyValuesInner(hbaseRpc).withCallback { kvs => - val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) - val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { - sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) - } else edgeWithScores - QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) -// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) - - } recoverWith { ex => - logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) - QueryResult(isFailure = true) -// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) - } - } - - val queryParam = queryRequest.queryParam - val cacheTTL = queryParam.cacheTTLInMillis - val request = buildRequest(queryRequest) - - val defer = - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) - } - defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)} - } - - - override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)], - prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = { - val defers: Seq[Deferred[QueryRequestWithResult]] = for { - (queryRequest, prevStepScore) <- queryRequestWithScoreLs - parentEdges <- prevStepEdges.get(queryRequest.vertex.id) - } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) - - val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers) - grouped withCallback { - queryResults: util.ArrayList[QueryRequestWithResult] => - queryResults.toIndexedSeq - } toFuture - } - - - def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = fetchSnapshotEdgeKeyValues(request) - - - /** - * when withWait is given, we use client with flushInterval set to 0. - * if we are not using this, then we are adding extra wait time as much as flushInterval in worst case. - * - * @param edges - * @param withWait - * @return - */ - override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = { - val _client = client(withWait) - val defers: Seq[Deferred[(Boolean, Long)]] = for { - edge <- edges - } yield { - val edgeWithIndex = edge.edgesWithIndex.head - val countWithTs = edge.propsWithTs(LabelMeta.countSeq) - val countVal = countWithTs.innerVal.toString().toLong - val incr = buildIncrementsCountAsync(edgeWithIndex, countVal).head - val request = incr.asInstanceOf[AtomicIncrementRequest] - _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long => - (true, resultCount.longValue()) - } recoverWith { ex => - logger.error(s"mutation failed. $request", ex) - (false, -1L) - } - } - - val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers) - grouped.toFuture.map(_.toSeq) - } - - - override def flush(): Unit = clients.foreach { client => - val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) - Await.result(client.flush().toFuture, timeout) - } - - - override def createTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String): Unit = { - logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") - val admin = getAdmin(zkAddr) - val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier - if (!admin.tableExists(TableName.valueOf(tableName))) { - try { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setTimeToLive(2147483647) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) - if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) - desc.addFamily(columnDesc) - } - - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } catch { - case e: Throwable => - logger.error(s"$zkAddr, $tableName failed with $e", e) - throw e - } - } else { - logger.info(s"$zkAddr, $tableName, $cfs already exist.") - } - } - - - /** Asynchbase implementation override default getVertices to use future Cache */ - override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { - def fromResult(queryParam: QueryParam, - kvs: Seq[SKeyValue], - version: String): Option[Vertex] = { - - if (kvs.isEmpty) None - else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None) - } - - val futures = vertices.map { vertex => - val kvs = vertexSerializer(vertex).toKeyValues - val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) - // get.setTimeout(this.singleGetTimeout.toShort) - get.setFailfast(true) - get.maxVersions(1) - - val cacheKey = MurmurHash3.stringHash(get.toString) - vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(get)).map { kvs => - fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion) - } - } - - Future.sequence(futures).map { result => result.toList.flatten } - } - - - - - - /** - * Private Methods which is specific to Asynchbase implementation. - */ - private def fetchKeyValuesInner(rpc: AnyRef): Deferred[util.ArrayList[KeyValue]] = { - rpc match { - case getRequest: GetRequest => client.get(getRequest) - case scanner: Scanner => - scanner.nextRows().withCallback { kvsLs => - val ls = new util.ArrayList[KeyValue] - if (kvsLs == null) { - - } else { - kvsLs.foreach { kvs => - if (kvs != null) kvs.foreach { kv => ls.add(kv) } - else { - - } - } - } - scanner.close() - ls - }.recoverWith { ex => - logger.error(s"fetchKeyValuesInner failed.", ex) - scanner.close() - emptyKeyValues - } - case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) - } - } - - private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = { - hbaseRpc match { - case getRequest: GetRequest => getRequest.key() - case scanner: Scanner => scanner.getCurrentKey() - case _ => - logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") - Array.empty[Byte] - } - } - - private def getSecureClusterAdmin(zkAddr: String) = { - val jaas = config.getString("java.security.auth.login.config") - val krb5Conf = config.getString("java.security.krb5.conf") - val realm = config.getString("realm") - val principal = config.getString("principal") - val keytab = config.getString("keytab") - - - - System.setProperty("java.security.auth.login.config", jaas) - System.setProperty("java.security.krb5.conf", krb5Conf) - // System.setProperty("sun.security.krb5.debug", "true") - // System.setProperty("sun.security.spnego.debug", "true") - val conf = new Configuration(true) - val hConf = HBaseConfiguration.create(conf) - - hConf.set("hbase.zookeeper.quorum", zkAddr) - - hConf.set("hadoop.security.authentication", "Kerberos") - hConf.set("hbase.security.authentication", "Kerberos") - hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm) - hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm) - - System.out.println("Connecting secure cluster, using keytab\n") - UserGroupInformation.setConfiguration(hConf) - UserGroupInformation.loginUserFromKeytab(principal, keytab) - val currentUser = UserGroupInformation.getCurrentUser() - System.out.println("current user : " + currentUser + "\n") - - // get table list - val conn = ConnectionFactory.createConnection(hConf) - conn.getAdmin - } - - /** - * following configuration need to come together to use secured hbase cluster. - * 1. set hbase.security.auth.enable = true - * 2. set file path to jaas file java.security.auth.login.config - * 3. set file path to kerberos file java.security.krb5.conf - * 4. set realm - * 5. set principal - * 6. set file path to keytab - * @param zkAddr - * @return - */ - private def getAdmin(zkAddr: String) = { - if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { - getSecureClusterAdmin(zkAddr) - } else { - val conf = HBaseConfiguration.create() - conf.set("hbase.zookeeper.quorum", zkAddr) - val conn = ConnectionFactory.createConnection(conf) - conn.getAdmin - } - } - - private def enableTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).enableTable(TableName.valueOf(tableName)) - } - - private def disableTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) - } - - private def dropTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) - getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName)) - } - - private def getStartKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount)) - } - - private def getEndKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala deleted file mode 100644 index 014a5c9..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ /dev/null @@ -1,132 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.indexedge.tall - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.StorageDeserializable._ -import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - - // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - pos += 1 - - val op = kv.row(pos) - pos += 1 - - if (pos == kv.row.length) { - // degree - // val degreeVal = Bytes.toLong(kv.value) - val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version), - LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) - } else { - // not degree edge - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) - pos = endAt - val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) - } - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v - - val idxPropsMap = idxProps.toMap - - val tgtVertexId = - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) - } else { - bytesToKeyValues(kv.value, 0, kv.value.length, version) - } - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } - } - }
