http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala deleted file mode 100644 index 337ed3f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala +++ /dev/null @@ -1,255 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import java.util -import java.util.concurrent.TimeUnit -import com.google.common.cache.CacheBuilder -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.QueryBuilder -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.{Extensions, logger} -import com.stumbleupon.async.Deferred -import org.apache.hadoop.hbase.util.Bytes -import org.hbase.async.GetRequest -import scala.annotation.tailrec -import scala.collection.JavaConversions._ -import scala.collection.{Map, Seq} -import scala.util.Random -import scala.concurrent.{ExecutionContext, Future} - -class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionContext) - extends QueryBuilder[GetRequest, Deferred[QueryRequestWithResult]](storage) { - - import Extensions.DeferOps - - val maxSize = storage.config.getInt("future.cache.max.size") - val expreAfterWrite = storage.config.getInt("future.cache.expire.after.write") - val expreAfterAccess = storage.config.getInt("future.cache.expire.after.access") - - val futureCache = CacheBuilder.newBuilder() -// .recordStats() - .initialCapacity(maxSize) - .concurrencyLevel(Runtime.getRuntime.availableProcessors()) - .expireAfterWrite(expreAfterWrite, TimeUnit.MILLISECONDS) - .expireAfterAccess(expreAfterAccess, TimeUnit.MILLISECONDS) -// .weakKeys() - .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[QueryRequestWithResult])]() - - // val scheduleTime = 60L * 60 -// val scheduleTime = 60 -// val scheduler = Executors.newScheduledThreadPool(1) -// -// scheduler.scheduleAtFixedRate(new Runnable(){ -// override def run() = { -// logger.info(s"[FutureCache]: ${futureCache.stats()}") -// } -// }, scheduleTime, scheduleTime, TimeUnit.SECONDS) - - override def buildRequest(queryRequest: QueryRequest): GetRequest = { - val srcVertex = queryRequest.vertex - // val tgtVertexOpt = queryRequest.tgtVertexOpt - val edgeCf = HSerializable.edgeCf - - val queryParam = queryRequest.queryParam - val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt - val label = queryParam.label - val labelWithDir = queryParam.labelWithDir - val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) - val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match { - case Some(tgtVertexId) => // _to is given. - /** we use toSnapshotEdge so dont need to swap src, tgt */ - val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion) - (src, tgt) - case None => - val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - (src, src) - } - - val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) - val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) - val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)).toMap - val edge = Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs) - - val get = if (tgtVertexIdOpt.isDefined) { - val snapshotEdge = edge.toSnapshotEdge - val kv = storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.head - new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - } else { - val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq) - assert(indexedEdgeOpt.isDefined) - - val indexedEdge = indexedEdgeOpt.get - val kv = storage.indexEdgeSerializer(indexedEdge).toKeyValues.head - val table = label.hbaseTableName.getBytes - val rowKey = kv.row - val cf = edgeCf - new GetRequest(table, rowKey, cf) - } - - val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) - - get.maxVersions(1) - get.setFailfast(true) - get.setMaxResultsPerColumnFamily(queryParam.limit) - get.setRowOffsetPerColumnFamily(queryParam.offset) - get.setMinTimestamp(minTs) - get.setMaxTimestamp(maxTs) - get.setTimeout(queryParam.rpcTimeoutInMillis) - - if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter) - - get - } - - override def getEdge(srcVertex: Vertex, tgtVertex: Vertex, queryParam: QueryParam, isInnerCall: Boolean): Deferred[QueryRequestWithResult] = { - //TODO: - val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(tgtVertex.innerId)) - val q = Query.toQuery(Seq(srcVertex), _queryParam) - val queryRequest = QueryRequest(q, 0, srcVertex, _queryParam) - fetch(queryRequest, 1.0, isInnerCall = true, parentEdges = Nil) - } - - override def fetch(queryRequest: QueryRequest, - prevStepScore: Double, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = { - @tailrec - def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (range < sampleNumber || set.size == sampleNumber) set - else randomInt(sampleNumber, range, set + Random.nextInt(range)) - } - - def sample(edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n){ - edges - }else{ - val plainEdges = if (queryRequest.queryParam.offset == 0) { - edges.tail - } else edges - - val randoms = randomInt(n, plainEdges.size) - var samples = List.empty[EdgeWithScore] - var idx = 0 - plainEdges.foreach { e => - if (randoms.contains(idx)) samples = e :: samples - idx += 1 - } - samples - } - - } - def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { - val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } - edgeWithScores.map { edgeWithScore => - edgeWithScore.copy(score = edgeWithScore.score / sum) - } - } - def fetchInner(request: GetRequest) = { - storage.client.get(request) withCallback { kvs => - val edgeWithScores = storage.toEdges(kvs.toSeq, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) - - val normalized = - if (queryRequest.queryParam.shouldNormalize) normalize(edgeWithScores) - else edgeWithScores - - val resultEdgesWithScores = - if (queryRequest.queryParam.sample >= 0 ) sample(normalized, queryRequest.queryParam.sample) - else normalized - - QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores)) - } recoverWith { ex => - logger.error(s"fetchQueryParam failed. fallback return.", ex) - QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) - } - } - def checkAndExpire(request: GetRequest, - cacheKey: Long, - cacheTTL: Long, - cachedAt: Long, - defer: Deferred[QueryRequestWithResult]): Deferred[QueryRequestWithResult] = { - if (System.currentTimeMillis() >= cachedAt + cacheTTL) { - // future is too old. so need to expire and fetch new data from storage. - futureCache.asMap().remove(cacheKey) - val newPromise = new Deferred[QueryRequestWithResult]() - futureCache.asMap().putIfAbsent(cacheKey, (System.currentTimeMillis(), newPromise)) match { - case null => - // only one thread succeed to come here concurrently - // initiate fetch to storage then add callback on complete to finish promise. - fetchInner(request) withCallback { queryRequestWithResult => - newPromise.callback(queryRequestWithResult) - queryRequestWithResult - } - newPromise - case (cachedAt, oldDefer) => oldDefer - } - } else { - // future is not to old so reuse it. - defer - } - } - - val queryParam = queryRequest.queryParam - val cacheTTL = queryParam.cacheTTLInMillis - val request = buildRequest(queryRequest) - - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - - val cacheVal = futureCache.getIfPresent(cacheKey) - cacheVal match { - case null => - // here there is no promise set up for this cacheKey so we need to set promise on future cache. - val promise = new Deferred[QueryRequestWithResult]() - val now = System.currentTimeMillis() - val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { - case null => - fetchInner(request) withCallback { queryRequestWithResult => - promise.callback(queryRequestWithResult) - queryRequestWithResult - } - (now, promise) - case oldVal => oldVal - } - checkAndExpire(request, cacheKey, cacheTTL, cachedAt, defer) - case (cachedAt, defer) => - checkAndExpire(request, cacheKey, cacheTTL, cachedAt, defer) - } - } - - } - - - override def toCacheKeyBytes(getRequest: GetRequest): Array[Byte] = { - var bytes = getRequest.key() - Option(getRequest.family()).foreach(family => bytes = Bytes.add(bytes, family)) - Option(getRequest.qualifiers()).foreach { - qualifiers => - qualifiers.filter(q => Option(q).isDefined).foreach { - qualifier => - bytes = Bytes.add(bytes, qualifier) - } - } - // if (getRequest.family() != null) bytes = Bytes.add(bytes, getRequest.family()) - // if (getRequest.qualifiers() != null) getRequest.qualifiers().filter(_ != null).foreach(q => bytes = Bytes.add(bytes, q)) - bytes - } - - - override def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] = { - val defers: Seq[Deferred[QueryRequestWithResult]] = for { - (queryRequest, prevStepScore) <- queryRequestWithScoreLs - parentEdges <- prevStepEdges.get(queryRequest.vertex.id) - } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) - - val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers) - grouped withCallback { - queryResults: util.ArrayList[QueryRequestWithResult] => - queryResults.toIndexedSeq - } toFuture - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index a83aacd..7c05aed 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -1,17 +1,13 @@ package com.kakao.s2graph.core.storage.hbase -import java.util -import com.google.common.cache.Cache -import com.kakao.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val} -import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.{Label, LabelMeta} -import com.kakao.s2graph.core.storage.Storage +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.storage._ import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.{Extensions, logger} +import com.kakao.s2graph.core.utils.{FutureCache, DeferCache, Extensions, logger} import com.stumbleupon.async.Deferred -import com.typesafe.config.Config +import com.typesafe.config.{ConfigFactory, Config} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} import org.apache.hadoop.hbase.io.compress.Compression.Algorithm @@ -19,20 +15,20 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} -import org.apache.kafka.clients.producer.ProducerRecord import org.apache.hadoop.security.UserGroupInformation import org.hbase.async._ import scala.collection.JavaConversions._ -import scala.collection.Seq +import scala.collection.{Map, Seq} import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future, duration} -import scala.util.Random import scala.util.hashing.MurmurHash3 +import java.util + object AsynchbaseStorage { - val vertexCf = HSerializable.vertexCf - val edgeCf = HSerializable.edgeCf + val vertexCf = Serializable.vertexCf + val edgeCf = Serializable.edgeCf val emptyKVs = new util.ArrayList[KeyValue]() @@ -63,757 +59,245 @@ object AsynchbaseStorage { } } -class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, Option[Vertex]]) - (implicit ec: ExecutionContext) extends Storage(config) { - - import AsynchbaseStorage._ - // import Extensions.FutureOps +class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext) + extends Storage[Deferred[QueryRequestWithResult]](config) { import Extensions.DeferOps + /** + * Asynchbase client setup. + * note that we need two client, one for bulk(withWait=false) and another for withWait=true + */ + val configWithFlush = config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval" -> "0"))) val client = AsynchbaseStorage.makeClient(config) - val queryBuilder = new AsynchbaseQueryBuilder(this)(ec) - val mutationBuilder = new AsynchbaseMutationBuilder(this)(ec) - - // val cacheOpt = Option(cache) - val cacheOpt = None - val vertexCacheOpt = Option(vertexCache) private val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") private val clients = Seq(client, clientWithFlush) - private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort - val MaxRetryNum = config.getInt("max.retry.number") - val MaxBackOff = config.getInt("max.back.off") - val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") - val FailProb = config.getDouble("hbase.fail.prob") - val LockExpireDuration = config.getInt("lock.expire.time") - - /** - * Serializer/Deserializer - */ - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = new SnapshotEdgeSerializable(snapshotEdge) - - def indexEdgeSerializer(indexedEdge: IndexEdge) = new IndexEdgeSerializable(indexedEdge) - - def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex) - - val snapshotEdgeDeserializer = new SnapshotEdgeDeserializable - val indexEdgeDeserializer = new IndexEdgeDeserializable - val vertexDeserializer = new VertexDeserializable - - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = queryBuilder.getEdges(q) - - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = { - val futures = for { - (srcVertex, tgtVertex, queryParam) <- params - } yield queryBuilder.getEdge(srcVertex, tgtVertex, queryParam, false).toFuture - - Future.sequence(futures) - } - - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { - def fromResult(queryParam: QueryParam, - kvs: Seq[org.hbase.async.KeyValue], - version: String): Option[Vertex] = { - - if (kvs.isEmpty) None - else { - val newKVs = kvs - Option(vertexDeserializer.fromKeyValues(queryParam, newKVs, version, None)) - } - } - - val futures = vertices.map { vertex => - val kvs = vertexSerializer(vertex).toKeyValues - val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, vertexCf) - // get.setTimeout(this.singleGetTimeout.toShort) - get.setFailfast(true) - get.maxVersions(1) - - val cacheKey = MurmurHash3.stringHash(get.toString) - val cacheVal = vertexCache.getIfPresent(cacheKey) - if (cacheVal == null) - client.get(get).toFutureWith(emptyKVs).map { kvs => - fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion) - } + private val emptyKeyValues = new util.ArrayList[KeyValue]() + private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client - else Future.successful(cacheVal) - } + /** Future Cache to squash request */ + private val futureCache = new DeferCache[QueryResult](config)(ec) - Future.sequence(futures).map { result => result.toList.flatten } - } + /** Simple Vertex Cache */ + private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec) - def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] = { - val edgeFuture = - if (edge.op == GraphUtil.operations("deleteAll")) { - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) - } else { - val strongConsistency = edge.label.consistencyLevel == "strong" - if (edge.op == GraphUtil.operations("delete") && !strongConsistency) { - val zkQuorum = edge.label.hbaseZkAddr - val (_, edgeUpdate) = Edge.buildDeleteBulk(None, edge) - val mutations = - mutationBuilder.indexedEdgeMutations(edgeUpdate) ++ - mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++ - mutationBuilder.increments(edgeUpdate) - writeAsyncSimple(zkQuorum, mutations, withWait) - } else { - mutateEdgesInner(Seq(edge), strongConsistency, withWait)(Edge.buildOperation) + /** + * fire rpcs into proper hbase cluster using client and + * return true on all mutation success. otherwise return false. + */ + override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { + if (kvs.isEmpty) Future.successful(true) + else { + val _client = client(withWait) + val futures = kvs.map { kv => + val _defer = kv.operation match { + case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp)) + case SKeyValue.Delete => + if (kv.qualifier == null) _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp)) + else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.timestamp)) + case SKeyValue.Increment => + _client.atomicIncrement(new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))) } - } - - val vertexFuture = writeAsyncSimple(edge.label.hbaseZkAddr, - mutationBuilder.buildVertexPutsAsync(edge), withWait) + val future = _defer.withCallback { ret => true }.recoverWith { ex => + logger.error(s"mutation failed. $kv", ex) + false + }.toFuture - Future.sequence(Seq(edgeFuture, vertexFuture)).map(_.forall(identity)) - } - - override def mutateEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { - val grouped = _edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val (deleteAllEdges, edges) = edgeGroup.partition(_.op == GraphUtil.operations("deleteAll")) - - // DeleteAll first - val deleteAllFutures = deleteAllEdges.map { edge => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) - } - - // After deleteAll, process others - lazy val mutateEdgeFutures = edges.toList match { - case head :: tail => - val strongConsistency = edges.head.label.consistencyLevel == "strong" - if (strongConsistency) { - val edgeFuture = mutateEdgesInner(edges, strongConsistency, withWait)(Edge.buildOperation) - - //TODO: decide what we will do on failure on vertex put - val puts = mutationBuilder.buildVertexPutsAsync(head) - val vertexFuture = writeAsyncSimple(head.label.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - } else { - edges.map { edge => mutateEdge(edge, withWait = withWait) } - } - case Nil => Nil + if (withWait) future else Future.successful(true) } - val composed = for { - deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield deleteRet ++ mutateRet - - composed.map(_.forall(identity)) + Future.sequence(futures).map(_.forall(identity)) } - - Future.sequence(mutateEdges) } - def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = { - if (vertex.op == GraphUtil.operations("delete")) { - writeAsyncSimple(vertex.hbaseZkAddr, mutationBuilder.buildDeleteAsync(vertex), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(true) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - writeAsyncSimple(vertex.hbaseZkAddr, mutationBuilder.buildPutsAll(vertex), withWait) - } - } - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = { - val _client = if (withWait) clientWithFlush else client - val defers: Seq[Deferred[(Boolean, Long)]] = for { - edge <- edges - } yield { - val edgeWithIndex = edge.edgesWithIndex.head - val countWithTs = edge.propsWithTs(LabelMeta.countSeq) - val countVal = countWithTs.innerVal.toString().toLong - val incr = mutationBuilder.buildIncrementsCountAsync(edgeWithIndex, countVal).head - val request = incr.asInstanceOf[AtomicIncrementRequest] - val defer = _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long => - (true, resultCount.longValue()) - } recoverWith { ex => - logger.error(s"mutation failed. $request", ex) - (false, -1L) - } - if (withWait) defer - else Deferred.fromResult((true, -1L)) + override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): Future[Seq[SKeyValue]] = { + val defer = fetchKeyValuesInner(hbaseRpc) + defer.toFuture.map { kvsArr => + kvsArr.map { kv => + implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + } toSeq } - - val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers) - grouped.toFuture.map(_.toSeq) } - private def writeAsyncSimpleRetry(zkQuorum: String, elementRpcs: Seq[HBaseRpc], withWait: Boolean): Future[Boolean] = { - def compute = writeAsyncSimple(zkQuorum, elementRpcs, withWait).flatMap { ret => - if (ret) Future.successful(ret) - else throw FetchTimeoutException("writeAsyncWithWaitRetrySimple") - } - Extensions.retryOnFailure(MaxRetryNum) { - compute - } { - logger.error(s"writeAsyncWithWaitRetrySimple: $elementRpcs") - false - } + /** + * since HBase natively provide CheckAndSet on storage level, implementation becomes simple. + * @param rpc: key value that is need to be stored on storage. + * @param expectedOpt: last valid value for rpc's KeyValue.value from fetching. + * @return return true if expected value matches and our rpc is successfully applied, otherwise false. + * note that when some other thread modified same cell and have different value on this KeyValue, + * then HBase atomically return false. + */ + override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = { + val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) + val expected = expectedOpt.map(_.value).getOrElse(Array.empty) + client(withWait = true).compareAndSet(put, expected).withCallback(ret => ret.booleanValue()).toFuture } - private def writeToStorage(_client: HBaseClient, rpc: HBaseRpc): Deferred[Boolean] = { - // logger.debug(s"$rpc") - val defer = rpc match { - case d: DeleteRequest => _client.delete(d) - case p: PutRequest => _client.put(p) - case i: AtomicIncrementRequest => _client.bufferAtomicIncrement(i) - } - defer withCallback { ret => true } recoverWith { ex => - logger.error(s"mutation failed. $rpc", ex) - false - } - } - private def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[HBaseRpc], withWait: Boolean): Future[Boolean] = { - val _client = if (withWait) clientWithFlush else client - if (elementRpcs.isEmpty) { - Future.successful(true) - } else { - val defers = elementRpcs.map { rpc => writeToStorage(_client, rpc) } - if (withWait) - Deferred.group(defers).toFuture map { arr => arr.forall(identity) } - else - Future.successful(true) - } - } + /** + * given queryRequest, build storage specific RPC Request. + * In HBase case, we either build Scanner or GetRequest. + * + * IndexEdge layer: + * Tall schema(v4): use scanner. + * Wide schema(label's schema version in v1, v2, v3): use GetRequest with columnRangeFilter + * when query is given with itnerval option. + * SnapshotEdge layer: + * Tall schema(v3, v4): use GetRequest without column filter. + * Wide schema(label's schema version in v1, v2): use GetRequest with columnRangeFilter. + * Vertex layer: + * all version: use GetRequest without column filter. + * @param queryRequest + * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter. + */ + override def buildRequest(queryRequest: QueryRequest): AnyRef = { + import Serializable._ + val queryParam = queryRequest.queryParam + val label = queryParam.label + val edge = toRequestEdge(queryRequest) - private def writeAsync(zkQuorum: String, elementRpcs: Seq[Seq[HBaseRpc]], withWait: Boolean): Future[Seq[Boolean]] = { - val _client = if (withWait) clientWithFlush else client - if (elementRpcs.isEmpty) { - Future.successful(Seq.empty[Boolean]) + val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + val snapshotEdge = edge.toSnapshotEdge + snapshotEdgeSerializer(snapshotEdge).toKeyValues.head + // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) } else { - val futures = elementRpcs.map { rpcs => - val defers = rpcs.map { rpc => writeToStorage(_client, rpc) } - if (withWait) - Deferred.group(defers).toFuture map { arr => arr.forall(identity) } - else - Future.successful(true) - } - if (withWait) - Future.sequence(futures) - else - Future.successful(elementRpcs.map(_ => true)) - } - } - - private def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[KeyValue])] = { - val labelWithDir = edge.labelWithDir - val queryParam = QueryParam(labelWithDir) - val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) - val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) - - client.get(queryBuilder.buildRequest(queryRequest)) withCallback { kvs => - val (edgeOpt, kvOpt) = - if (kvs.isEmpty()) (None, None) - else { - val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, parentEdges = Nil).headOption.map(_.edge) - val _kvOpt = kvs.headOption - (_edgeOpt, _kvOpt) - } - (queryParam, edgeOpt, kvOpt) - } recoverWith { ex => - logger.error(s"fetchQueryParam failed. fallback return.", ex) - throw new FetchTimeoutException(s"${edge.toLogString}") - } toFuture - } - - - case class PartialFailureException(edge: Edge, statusCode: Byte, faileReason: String) extends Exception - - def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") - logger.debug(msg) - } - - def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { - val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", - s"${edgeMutate.toLogString}").mkString("\n") - logger.debug(msg) - } + val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq) + assert(indexedEdgeOpt.isDefined) - private def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: Option[KeyValue]) = { - val currentTs = System.currentTimeMillis() - val lockTs = snapshotEdgeOpt match { - case None => Option(currentTs) - case Some(snapshotEdge) => - snapshotEdge.pendingEdgeOpt match { - case None => Option(currentTs) - case Some(pendingEdge) => pendingEdge.lockTs - } + val indexedEdge = indexedEdgeOpt.get + indexEdgeSerializer(indexedEdge).toKeyValues.head } - val newVersion = kvOpt.map(_.timestamp()).getOrElse(edge.ts) + 1 - // snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1 - val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = lockTs) - val base = snapshotEdgeOpt match { - case None => - // no one ever mutated on this snapshotEdge. - edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - case Some(snapshotEdge) => - // there is at least one mutation have been succeed. - snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) - } - base.copy(version = newVersion, statusCode = 1, lockTs = None) - } - private def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: SnapshotEdge, - edgeMutate: EdgeMutate) = { - val newVersion = lockEdge.version + 1 - val base = edgeMutate.newSnapshotEdge match { - case None => - // shouldReplace false - assert(snapshotEdgeOpt.isDefined) - snapshotEdgeOpt.get.toSnapshotEdge - case Some(newSnapshotEdge) => newSnapshotEdge - } - base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None) - } + val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) + val get = + if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) + else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf) - def mutate(predicate: Boolean, - edge: Edge, - statusCode: Byte, - _edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) throw new PartialFailureException(edge, 1, "predicate failed.") + get.maxVersions(1) + get.setFailfast(true) + get.setMaxResultsPerColumnFamily(queryParam.limit) + get.setRowOffsetPerColumnFamily(queryParam.offset) + get.setMinTimestamp(minTs) + get.setMaxTimestamp(maxTs) + get.setTimeout(queryParam.rpcTimeoutInMillis) - if (statusCode >= 2) { - logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p") - else - writeAsyncSimple(edge.label.hbaseZkAddr, mutationBuilder.indexedEdgeMutations(_edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate) - } else { - throw new PartialFailureException(edge, 1, "hbase fail.") - } - true - } - } - } + if (queryParam.columnRangeFilter != null) get.setFilter(queryParam.columnRangeFilter) - def increment(predicate: Boolean, - edge: Edge, - statusCode: Byte, _edgeMutate: EdgeMutate): Future[Boolean] = { - if (!predicate) throw new PartialFailureException(edge, 2, "predicate failed.") - if (statusCode >= 3) { - logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p") - else - writeAsyncSimple(edge.label.hbaseZkAddr, mutationBuilder.increments(_edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate) - } else { - throw new PartialFailureException(edge, 2, "hbase fail.") - } - true - } - } + get } - def acquireLock(statusCode: Byte, edge: Edge, - lockEdge: SnapshotEdge, oldBytes: Array[Byte]): Future[Boolean] = - if (statusCode >= 1) { - logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}") - Future.successful(true) - } else { - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p") - else { - val lockEdgePut = toPutRequest(lockEdge) - client.compareAndSet(lockEdgePut, oldBytes).toFuture.recoverWith { - case ex: Exception => - logger.error(s"AcquireLock RPC Failed.") - throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed") - }.map { ret => - if (ret) { - val log = Seq( - "\n", - "=" * 50, - s"[Success]: acquireLock", - s"[RequestEdge]: ${edge.toLogString}", - s"[LockEdge]: ${lockEdge.toLogString()}", - s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", - "=" * 50, "\n").mkString("\n") - - logger.debug(log) - // debug(ret, "acquireLock", edge.toSnapshotEdge) - } else { - throw new PartialFailureException(edge, 0, "hbase fail.") - } - true - } - } - } + /** + * we are using future cache to squash requests into same key on storage. + * + * @param queryRequest + * @param prevStepScore + * @param isInnerCall + * @param parentEdges + * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. + * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback + */ + override def fetch(queryRequest: QueryRequest, + prevStepScore: Double, + isInnerCall: Boolean, + parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = { + + def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = { + fetchKeyValuesInner(hbaseRpc).withCallback { kvs => + val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) + val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { + sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + } else edgeWithScores + QueryResult(resultEdgesWithScores) + // QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) - def releaseLock(predicate: Boolean, - edge: Edge, - lockEdge: SnapshotEdge, - releaseLockEdge: SnapshotEdge, - _edgeMutate: EdgeMutate, - oldBytes: Array[Byte]): Future[Boolean] = { - if (!predicate) { - throw new PartialFailureException(edge, 3, "predicate failed.") - } - val p = Random.nextDouble() - if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p") - else { - val releaseLockEdgePut = toPutRequest(releaseLockEdge) - val lockEdgePut = toPutRequest(lockEdge) - - client.compareAndSet(releaseLockEdgePut, lockEdgePut.value()).toFuture.recoverWith { - case ex: Exception => - logger.error(s"ReleaseLock RPC Failed.") - throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed") - }.map { ret => - if (ret) { - debug(ret, "releaseLock", edge.toSnapshotEdge) - } else { - val msg = Seq("\nFATAL ERROR\n", - "=" * 50, - oldBytes.toList, - lockEdgePut.value.toList, - releaseLockEdgePut.value().toList, - "=" * 50, - "\n" - ) - logger.error(msg.mkString("\n")) - // error(ret, "releaseLock", edge.toSnapshotEdge) - throw new PartialFailureException(edge, 3, "hbase fail.") - } - true + } recoverWith { ex => + logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) + QueryResult(isFailure = true) + // QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) } } - // } - } - - private def toPutRequest(snapshotEdge: SnapshotEdge): PutRequest = { - mutationBuilder.buildPutAsync(snapshotEdge).head.asInstanceOf[PutRequest] - } + val queryParam = queryRequest.queryParam + val cacheTTL = queryParam.cacheTTLInMillis + val request = buildRequest(queryRequest) - private def commitUpdate(edge: Edge, - statusCode: Byte)(snapshotEdgeOpt: Option[Edge], - kvOpt: Option[KeyValue], - edgeUpdate: EdgeMutate): Future[Boolean] = { - val label = edge.label - def oldBytes = kvOpt.map(_.value()).getOrElse(Array.empty) - // def oldBytes = snapshotEdgeOpt.map { e => - // snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head.value - // }.getOrElse(Array.empty) - def process(lockEdge: SnapshotEdge, - releaseLockEdge: SnapshotEdge, - _edgeMutate: EdgeMutate, - statusCode: Byte): Future[Boolean] = { - - for { - locked <- acquireLock(statusCode, edge, lockEdge, oldBytes) - mutated <- mutate(locked, edge, statusCode, _edgeMutate) - incremented <- increment(mutated, edge, statusCode, _edgeMutate) - released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, _edgeMutate, oldBytes) - } yield { - released + val defer = + if (cacheTTL <= 0) fetchInner(request) + else { + val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) + val cacheKey = queryParam.toCacheKey(cacheKeyBytes) + futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) } - } - - - val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt) - val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, edgeUpdate) - snapshotEdgeOpt match { - case None => - // no one ever did success on acquire lock. - process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) - case Some(snapshotEdge) => - // someone did success on acquire lock at least one. - snapshotEdge.pendingEdgeOpt match { - case None => - // not locked - process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) - case Some(pendingEdge) => - def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() - if (isLockExpired) { - val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) - val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(pendingEdge)) - val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, kvOpt) - val _newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, newLockEdge, newEdgeUpdate) - - // set lock ts as current ts - val newPendingEdgeOpt = _newReleaseLockEdge.pendingEdgeOpt.map(_.copy(lockTs = Option(System.currentTimeMillis()))) - val newReleaseLockEdge = _newReleaseLockEdge.copy(pendingEdgeOpt = newPendingEdgeOpt) - - process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode = 0).flatMap { ret => - val log = s"[Success]: Resolving expired pending edge.\n${pendingEdge.toLogString}" - throw new PartialFailureException(edge, 0, log) - } - } else { - // locked - if (pendingEdge.ts == edge.ts && statusCode > 0) { - // self locked - val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) - val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(edge)) - val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, newEdgeUpdate) - - /** lockEdge will be ignored */ - process(lockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode) - } else { - throw new PartialFailureException(edge, statusCode, s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]") - } - } - } - } + defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)} } - private def mutateEdgesInner(edges: Seq[Edge], - checkConsistency: Boolean, - withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => (Edge, EdgeMutate)): Future[Boolean] = { - if (!checkConsistency) { - val zkQuorum = edges.head.label.hbaseZkAddr - val futures = edges.map { edge => - val (_, edgeUpdate) = f(None, Seq(edge)) - val mutations = - mutationBuilder.indexedEdgeMutations(edgeUpdate) ++ - mutationBuilder.snapshotEdgeMutations(edgeUpdate) ++ - mutationBuilder.increments(edgeUpdate) - writeAsyncSimple(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => rets.forall(identity) } - } else { - def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = { - - fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => - - val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges) - //shouldReplace false. - if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) { - logger.debug(s"${newEdge.toLogString} drop.") - Future.successful(true) - } else { - commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, edgeUpdate).map { ret => - if (ret) { - logger.info(s"[Success] commit: \n${_edges.map(_.toLogString).mkString("\n")}") - } else { - throw new PartialFailureException(newEdge, 3, "commit failed.") - } - true - } - } - } - } - def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: (Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = { - if (tryNum >= MaxRetryNum) { - edges.foreach { edge => - logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") - ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = edge)) - } - Future.successful(false) - } else { - val future = fn(edges, statusCode) - future.onSuccess { - case success => - logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") - } - future recoverWith { - case FetchTimeoutException(retryEdge) => - logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") - retry(tryNum + 1)(edges, statusCode)(fn) - - case PartialFailureException(retryEdge, failedStatusCode, faileReason) => - val status = failedStatusCode match { - case 0 => "AcquireLock failed." - case 1 => "Mutation failed." - case 2 => "Increment failed." - case 3 => "ReleaseLock failed." - case 4 => "Unknown" - } - - Thread.sleep(Random.nextInt(MaxBackOff)) - logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") - retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn) - case ex: Exception => - logger.error("Unknown exception", ex) - Future.successful(false) - } - } - } - retry(1)(edges, 0)(commit) - } - } + override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)], + prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = { + val defers: Seq[Deferred[QueryRequestWithResult]] = for { + (queryRequest, prevStepScore) <- queryRequestWithScoreLs + parentEdges <- prevStepEdges.get(queryRequest.vertex.id) + } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) - def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge], - newEdge: Edge, edgeMutate: EdgeMutate) = { - Seq("----------------------------------------------", - s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", - s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", - s"newEdge: ${newEdge.toLogString}", - s"mutation: \n${edgeMutate.toLogString}", - "----------------------------------------------").mkString("\n") + val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers) + grouped withCallback { + queryResults: util.ArrayList[QueryRequestWithResult] => + queryResults.toIndexedSeq + } toFuture } - private def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest, - queryResult: QueryResult, - requestTs: Long, - retryNum: Int): Future[Boolean] = { - val queryParam = queryRequest.queryParam - val zkQuorum = queryParam.label.hbaseZkAddr - val futures = for { - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } yield { - /** reverted direction */ - val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexedEdge => - mutationBuilder.buildDeletesAsync(indexedEdge) ++ mutationBuilder.buildIncrementsAsync(indexedEdge, -1L) - } - val reversedSnapshotEdgeMutations = mutationBuilder.buildDeleteAsync(edge.toSnapshotEdge) - val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexedEdge => - mutationBuilder.buildDeletesAsync(indexedEdge) ++ mutationBuilder.buildIncrementsAsync(indexedEdge, -1L) - } - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - writeAsyncSimple(zkQuorum, mutations, withWait = true) - } - Future.sequence(futures).map { rets => rets.forall(identity) } - } + def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = fetchSnapshotEdgeKeyValues(request) - private def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get - val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore => - (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - }.map { edgeWithScore => - val label = queryRequest.queryParam.label - val newPropsWithTs = edgeWithScore.edge.propsWithTs ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) - val copiedEdge = edgeWithScore.edge.copy(op = GraphUtil.operations("delete"), version = requestTs, - propsWithTs = newPropsWithTs) - edgeWithScore.copy(edge = copiedEdge) - } - queryResult.copy(edgeWithScoreLs = edgeWithScoreLs) - } - private def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = { - val queryResultLs = queryRequestWithResultLs.map(_.queryResult) - queryResultLs.foreach { queryResult => - if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.") - } - - val futures = for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs) - if deleteQueryResult.edgeWithScoreLs.nonEmpty + /** + * when withWait is given, we use client with flushInterval set to 0. + * if we are not using this, then we are adding extra wait time as much as flushInterval in worst case. + * + * @param edges + * @param withWait + * @return + */ + override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = { + val _client = client(withWait) + val defers: Seq[Deferred[(Boolean, Long)]] = for { + edge <- edges } yield { - val label = queryRequest.queryParam.label - label.schemaVersion match { - case HBaseType.VERSION3 if label.consistencyLevel == "strong" => - - /** - * read: snapshotEdge on queryResult = O(N) - * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) - */ - mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) - case _ => - - /** - * read: x - * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) - */ - deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) + val edgeWithIndex = edge.edgesWithIndex.head + val countWithTs = edge.propsWithTs(LabelMeta.countSeq) + val countVal = countWithTs.innerVal.toString().toLong + val incr = buildIncrementsCountAsync(edgeWithIndex, countVal).head + val request = incr.asInstanceOf[AtomicIncrementRequest] + _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long => + (true, resultCount.longValue()) + } recoverWith { ex => + logger.error(s"mutation failed. $request", ex) + (false, -1L) + } } - } - if (futures.isEmpty) { - // all deleted. - Future.successful(true -> true) - } else { - Future.sequence(futures).map { rets => false -> rets.forall(identity) } - } - } - - def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = { - val future = for { - queryRequestWithResultLs <- getEdges(query) - (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs) - } yield { - (allDeleted, ret) - } - Extensions.retryOnFailure(MaxRetryNum) { - future - } { - logger.error(s"fetch and deleteAll failed.") - (true, false) - } + val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers) + grouped.toFuture.map(_.toSeq) } - def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], - labels: Seq[Label], - dir: Int, - ts: Long): Future[Boolean] = { - - def enqueueLogMessage() = { - val kafkaMessages = for { - vertice <- srcVertices - id = vertice.innerId.toIdString() - label <- labels - } yield { - val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") - val topic = ExceptionHandler.failTopic - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg - } - - ExceptionHandler.enqueues(kafkaMessages) - } - val requestTs = ts - val queryParams = for { - label <- labels - } yield { - val labelWithDir = LabelWithDirection(label.id.get, dir) - QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw)) - } - - val step = Step(queryParams.toList) - val q = Query(srcVertices, Vector(step)) - - // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { - val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) { - fetchAndDeleteAll(q, requestTs) - } { case (allDeleted, deleteSuccess) => - allDeleted && deleteSuccess - }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } - - retryFuture onFailure { - case ex => - logger.error(s"[Error]: deleteAllAdjacentEdges failed.") - enqueueLogMessage() - } - - retryFuture - } - - def flush(): Unit = clients.foreach { client => + override def flush(): Unit = clients.foreach { client => val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) Await.result(client.flush().toFuture, timeout) } - def createTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String): Unit = { + override def createTable(zkAddr: String, + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String): Unit = { logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") val admin = getAdmin(zkAddr) val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier @@ -847,6 +331,77 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, } } + + /** Asynchbase implementation override default getVertices to use future Cache */ + override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { + def fromResult(queryParam: QueryParam, + kvs: Seq[SKeyValue], + version: String): Option[Vertex] = { + + if (kvs.isEmpty) None + else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None) + } + + val futures = vertices.map { vertex => + val kvs = vertexSerializer(vertex).toKeyValues + val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) + // get.setTimeout(this.singleGetTimeout.toShort) + get.setFailfast(true) + get.maxVersions(1) + + val cacheKey = MurmurHash3.stringHash(get.toString) + vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(get)).map { kvs => + fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion) + } + } + + Future.sequence(futures).map { result => result.toList.flatten } + } + + + + + + /** + * Private Methods which is specific to Asynchbase implementation. + */ + private def fetchKeyValuesInner(rpc: AnyRef): Deferred[util.ArrayList[KeyValue]] = { + rpc match { + case getRequest: GetRequest => client.get(getRequest) + case scanner: Scanner => + scanner.nextRows().withCallback { kvsLs => + val ls = new util.ArrayList[KeyValue] + if (kvsLs == null) { + + } else { + kvsLs.foreach { kvs => + if (kvs != null) kvs.foreach { kv => ls.add(kv) } + else { + + } + } + } + scanner.close() + ls + }.recoverWith { ex => + logger.error(s"fetchKeyValuesInner failed.", ex) + scanner.close() + emptyKeyValues + } + case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) + } + } + + private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = { + hbaseRpc match { + case getRequest: GetRequest => getRequest.key() + case scanner: Scanner => scanner.getCurrentKey() + case _ => + logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") + Array.empty[Byte] + } + } + private def getSecureClusterAdmin(zkAddr: String) = { val jaas = config.getString("java.security.auth.login.config") val krb5Conf = config.getString("java.security.krb5.conf") @@ -855,6 +410,7 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, val keytab = config.getString("keytab") + System.setProperty("java.security.auth.login.config", jaas) System.setProperty("java.security.krb5.conf", krb5Conf) // System.setProperty("sun.security.krb5.debug", "true") @@ -922,5 +478,4 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, private def getEndKey(regionCount: Int): Array[Byte] = { Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala deleted file mode 100644 index 25fe642..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HDeserializable.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.storage.{SKeyValue, StorageDeserializable} -import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId, VertexId} -import org.apache.hadoop.hbase.util.Bytes - - -trait HDeserializable[E] extends StorageDeserializable[E] { - import StorageDeserializable._ - - type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int) - - /** version 1 and version 2 share same code for parsing row key part */ - def parseRow(kv: SKeyValue, version: String): RowKeyRaw = { - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - - val rowLen = srcIdLen + 4 + 1 - (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala deleted file mode 100644 index cc21cba..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/HSerializable.scala +++ /dev/null @@ -1,10 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.storage.StorageSerializable - -object HSerializable { - val vertexCf = "v".getBytes() - val edgeCf = "e".getBytes() -} - -trait HSerializable[E] extends StorageSerializable[E] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala deleted file mode 100644 index 64ac1cf..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeDeserializable.scala +++ /dev/null @@ -1,116 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, SKeyValue} -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.logger -import org.apache.hadoop.hbase.util.Bytes - -class IndexEdgeDeserializable extends HDeserializable[IndexEdge] { - - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - val degree = Bytes.toLong(kv.value) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - - - /** version 1 and version 2 is same logic */ - override def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[IndexEdge] = None): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, version)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) - else parseQualifier(kv, version) - - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { - val countVal = Bytes.toLong(kv.value) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) - } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) - } else { - parseValue(kv, version) - } - - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException("invalid index seq")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } - - val idxPropsMap = idxProps.toMap - val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - -// logger.error(s"$mergedProps") -// val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala deleted file mode 100644 index ceba4b9..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/IndexEdgeSerializable.scala +++ /dev/null @@ -1,45 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue} -import com.kakao.s2graph.core.types.VertexId -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{GraphUtil, IndexEdge} -import org.apache.hadoop.hbase.util.Bytes - -case class IndexEdgeSerializable(indexEdge: IndexEdge) extends HSerializable[IndexEdge] { - - import StorageSerializable._ - - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = HSerializable.edgeCf - - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) - - /** version 1 and version 2 share same code for serialize row key part */ - override def toKeyValues: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) -// logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") - val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes - val qualifier = - if (indexEdge.op == GraphUtil.operations("incrementCount")) { - Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) - } else { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => Bytes.add(idxPropsBytes, tgtIdBytes) - case Some(vId) => idxPropsBytes - } - } - - val value = propsToKeyValues(indexEdge.metas.toSeq) - val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) - - Seq(kv) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala deleted file mode 100644 index 97f63e4..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeDeserializable.scala +++ /dev/null @@ -1,140 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} -import com.kakao.s2graph.core.storage.{CanSKeyValue, SKeyValue, StorageDeserializable} -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -class SnapshotEdgeDeserializable extends HDeserializable[SnapshotEdge] { - - import StorageDeserializable._ - - override def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - queryParam.label.schemaVersion match { - case HBaseType.VERSION3 => fromKeyValuesInnerV3(queryParam, _kvs, version, cacheElementOpt) - case _ => fromKeyValuesInner(queryParam, _kvs, version, cacheElementOpt) - } - } - - def statusCodeWithOp(byte: Byte): (Byte, Byte) = { - val statusCode = byte >> 4 - val op = byte & ((1 << 4) - 1) - (statusCode.toByte, op.toByte) - } - - private def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - assert(kvs.size == 1) - - val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion - val cellVersion = kv.timestamp - - val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) - }.getOrElse(parseRow(kv, schemaVer)) - - val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { - val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) - var pos = 0 - val (statusCode, op) = statusCodeWithOp(kv.value(pos)) - pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong - - pos = endAt - val _pendingEdgeOpt = - if (pos == kv.value.length) None - else { - val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) - pos += 1 - // val versionNum = Bytes.toLong(kv.value, pos, 8) - // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - pos = endAt - val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) - - val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), - Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, - cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) - Option(pendingEdge) - } - - (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) - } - - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) - } - - private def fromKeyValuesInnerV3[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - assert(kvs.size == 1) - - val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion - val cellVersion = kv.timestamp - /** rowKey */ - def parseRowV3(kv: SKeyValue, version: String) = { - var pos = 0 - val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdAndTgtIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - - val rowLen = srcIdAndTgtIdLen + 4 + 1 - (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen) - - } - val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e => - (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) - }.getOrElse(parseRowV3(kv, schemaVer)) - - val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) - val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) - - val (props, op, ts, statusCode, _pendingEdgeOpt) = { - var pos = 0 - val (statusCode, op) = statusCodeWithOp(kv.value(pos)) - pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong - - pos = endAt - val _pendingEdgeOpt = - if (pos == kv.value.length) None - else { - val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) - pos += 1 - // val versionNum = Bytes.toLong(kv.value, pos, 8) - // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - pos = endAt - val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) - - val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), - Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, - cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) - Option(pendingEdge) - } - - (kvsMap, op, ts, statusCode, _pendingEdgeOpt) - } - - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala deleted file mode 100644 index a9e77dc..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/SnapshotEdgeSerializable.scala +++ /dev/null @@ -1,91 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import java.util.UUID - -import com.kakao.s2graph.core.SnapshotEdge -import com.kakao.s2graph.core.mysqls.LabelIndex -import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue} -import com.kakao.s2graph.core.types.{HBaseType, SourceAndTargetVertexIdPair, VertexId} -import com.kakao.s2graph.core.utils.logger -import org.apache.hadoop.hbase.util.Bytes - -import scala.util.Random - -class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends HSerializable[SnapshotEdge] { - import StorageSerializable._ - - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = HSerializable.edgeCf - - def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { - val byte = (((statusCode << 4) | op).toByte) - Array.fill(1)(byte.toByte) - } - def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), - propsToKeyValuesWithTs(snapshotEdge.props.toList)) - - override def toKeyValues: Seq[SKeyValue] = { - label.schemaVersion match { - case HBaseType.VERSION3 => toKeyValuesInnerV3 - case _ => toKeyValuesInner - } - } - - private def toKeyValuesInner: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes - val labelWithDirBytes = snapshotEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes - - val qualifier = tgtIdBytes - - val value = snapshotEdge.pendingEdgeOpt match { - case None => valueBytes() - case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) - val versionBytes = Array.empty[Byte] -// Bytes.toBytes(snapshotEdge.version) - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) -// Array.empty[Byte] -// snapshotEdge.lockedAtOpt.map(lockedAt => Bytes.toBytes(lockedAt)).getOrElse(Array.empty[Byte]) - Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) - } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } - - private def toKeyValuesInnerV3: Seq[SKeyValue] = { - val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes - val labelWithDirBytes = snapshotEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - - val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - - val qualifier = Array.empty[Byte] - - val value = snapshotEdge.pendingEdgeOpt match { - case None => valueBytes() - case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) - val versionBytes = Array.empty[Byte] -// Bytes.toBytes(snapshotEdge.version) - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) -// Array.empty[Byte] -// snapshotEdge.lockedAtOpt.map(lockedAt => Bytes.toBytes(lockedAt)).getOrElse(Array.empty[Byte]) -// logger.error(s"ValueBytes: ${valueBytes().toList}") -// logger.error(s"opBytes: ${opBytes.toList}") -// logger.error(s"versionBytes: ${versionBytes.toList}") -// logger.error(s"PropsBytes: ${propsBytes.toList}") -// logger.error(s"LockBytes: ${lockBytes.toList}") - Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) - } - - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala deleted file mode 100644 index 4c0ca19..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexDeserializable.scala +++ /dev/null @@ -1,47 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.storage.CanSKeyValue -import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId} -import com.kakao.s2graph.core.{QueryParam, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -import scala.collection.mutable.ListBuffer - -class VertexDeserializable extends HDeserializable[Vertex] { - def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[Vertex]): Vertex = { - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) - - var maxTs = Long.MinValue - val propsMap = new collection.mutable.HashMap[Int, InnerValLike] - val belongLabelIds = new ListBuffer[Int] - - for { - kv <- kvs - } { - val propKey = - if (kv.qualifier.length == 1) kv.qualifier.head.toInt - else Bytes.toInt(kv.qualifier) - - val ts = kv.timestamp - if (ts > maxTs) maxTs = ts - - if (Vertex.isLabelId(propKey)) { - belongLabelIds += Vertex.toLabelId(propKey) - } else { - val v = kv.value - val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) - propsMap += (propKey -> value) - } - } - assert(maxTs != Long.MinValue) - Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala deleted file mode 100644 index 370b844..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/VertexSerializable.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core.Vertex -import com.kakao.s2graph.core.storage.SKeyValue -import org.apache.hadoop.hbase.util.Bytes - -case class VertexSerializable(vertex: Vertex) extends HSerializable[Vertex] { - - val cf = HSerializable.vertexCf - - override def toKeyValues: Seq[SKeyValue] = { - val row = vertex.id.bytes - val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes - val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } - (base ++ belongsTo).map { case (qualifier, value) => - SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) - } toSeq - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala new file mode 100644 index 0000000..6777c28 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala @@ -0,0 +1,82 @@ +package com.kakao.s2graph.core.utils + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.CacheBuilder +import com.stumbleupon.async.Deferred +import com.typesafe.config.Config + +import scala.concurrent.ExecutionContext + +class DeferCache[R](config: Config)(implicit ex: ExecutionContext) { + + import com.kakao.s2graph.core.utils.Extensions.DeferOps + + type Value = (Long, Deferred[R]) + + private val maxSize = config.getInt("future.cache.max.size") + private val expireAfterWrite = config.getInt("future.cache.expire.after.write") + private val expireAfterAccess = config.getInt("future.cache.expire.after.access") + + private val futureCache = CacheBuilder.newBuilder() + .initialCapacity(maxSize) + .concurrencyLevel(Runtime.getRuntime.availableProcessors()) + .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) + .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]() + + + def asMap() = futureCache.asMap() + + def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey) + + private def checkAndExpire(cacheKey: Long, + cachedAt: Long, + cacheTTL: Long, + oldDefer: Deferred[R])(op: => Deferred[R]): Deferred[R] = { + if (System.currentTimeMillis() >= cachedAt + cacheTTL) { + // future is too old. so need to expire and fetch new data from storage. + futureCache.asMap().remove(cacheKey) + + val newPromise = new Deferred[R]() + val now = System.currentTimeMillis() + + futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { + case null => + // only one thread succeed to come here concurrently + // initiate fetch to storage then add callback on complete to finish promise. + op withCallback { value => + newPromise.callback(value) + value + } + newPromise + case (cachedAt, oldDefer) => oldDefer + } + } else { + // future is not to old so reuse it. + oldDefer + } + } + def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): Deferred[R] = { + val cacheVal = futureCache.getIfPresent(cacheKey) + cacheVal match { + case null => + val promise = new Deferred[R]() + val now = System.currentTimeMillis() + val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { + case null => + op.withCallback { value => + promise.callback(value) + value + } + (now, promise) + case oldVal => oldVal + } + checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) + + case (cachedAt, defer) => + checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala index 4858f60..eea9a79 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala @@ -2,7 +2,6 @@ package com.kakao.s2graph.core.utils import com.stumbleupon.async.{Callback, Deferred} import com.typesafe.config.Config - import scala.concurrent.{ExecutionContext, Future, Promise} object Extensions { @@ -70,5 +69,4 @@ object Extensions { def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean = if (config.hasPath(key)) config.getBoolean(key) else defaultValue } - }
