http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala new file mode 100644 index 0000000..4239d15 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.hbase + +import java.util + +import com.stumbleupon.async.Deferred +import com.typesafe.config.Config +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.{HBaseType, VertexId} +import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger} +import org.hbase.async._ + +import scala.concurrent.ExecutionContext + +class AsynchbaseEdgeFetcher(val graph: S2GraphLike, + val config: Config, + val client: HBaseClient, + val serDe: StorageSerDe, + val io: StorageIO) extends EdgeFetcher { + + import AsynchbaseStorage._ + import CanDefer._ + import Extensions.DeferOps + + import scala.collection.JavaConverters._ + + /** Future Cache to squash request */ + lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true) + + override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = { + val defers: Seq[Deferred[StepResult]] = for { + queryRequest <- queryRequests + } yield { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil + fetch(queryRequest, isInnerCall = false, parentEdges) + } + + val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers.asJava) + grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] => + queryResults + }.toFuture(emptyStepResult).map(_.asScala) + } + + /** + * we are using future cache to squash requests into same key on storage. + * + * @param queryRequest + * @param isInnerCall + * @param parentEdges + * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. + * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback + */ + private def fetch(queryRequest: QueryRequest, + isInnerCall: Boolean, + parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = { + + def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = { + val prevStepScore = queryRequest.prevStepScore + val fallbackFn: (Exception => StepResult) = { ex => + logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) + StepResult.Failure + } + + val queryParam = queryRequest.queryParam + AsynchbaseStorage.fetchKeyValuesInner(client, hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { _kvs => + val kvs = _kvs.asScala + val (startOffset, len) = queryParam.label.schemaVersion match { + case HBaseType.VERSION4 => + val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset + (offset, queryParam.limit) + case _ => (0, kvs.length) + } + + io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len) + } + } + + val queryParam = queryRequest.queryParam + val cacheTTL = queryParam.cacheTTLInMillis + /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + + val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges) + val request = buildRequest(client, serDe, queryRequest, edge) + + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) + + if (cacheTTL <= 0) fetchInner(request) + else { + val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey) + + // val cacheKeyBytes = toCacheKeyBytes(request) + val cacheKey = queryParam.toCacheKey(cacheKeyBytes) + futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala new file mode 100644 index 0000000..f03310c --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.hbase + +import org.apache.s2graph.core.{QueryRequest, S2EdgeLike} +import org.apache.s2graph.core.storage.{OptimisticEdgeFetcher, SKeyValue, StorageIO, StorageSerDe} +import org.hbase.async.HBaseClient + +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseOptimisticEdgeFetcher(val client: HBaseClient, + val serDe: StorageSerDe, + val io: StorageIO) extends OptimisticEdgeFetcher { + override protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + val request = AsynchbaseStorage.buildRequest(client, serDe, queryRequest, edge) + AsynchbaseStorage.fetchKeyValues(client, request) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala new file mode 100644 index 0000000..8305e04 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.hbase + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.S2GraphLike +import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.utils.{Extensions, logger} +import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest} + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseOptimisticMutator(val graph: S2GraphLike, + val serDe: StorageSerDe, + val optimisticEdgeFetcher: OptimisticEdgeFetcher, + val client: HBaseClient, + val clientWithFlush: HBaseClient) extends OptimisticMutator { + import Extensions.DeferOps + + private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client + /** + * decide how to store given key values Seq[SKeyValue] into storage using storage's client. + * note that this should be return true on all success. + * we assumes that each storage implementation has client as member variable. + * + * @param cluster : where this key values should be stored. + * @param kvs : sequence of SKeyValue that need to be stored in storage. + * @param withWait : flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @return ack message from storage. + */ + override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { + if (kvs.isEmpty) Future.successful(MutateResponse.Success) + else { + val _client = client(withWait) + val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) + + /* Asynchbase IncrementRequest does not implement HasQualifiers */ + val incrementsFutures = increments.map { kv => + val countVal = Bytes.toLong(kv.value) + val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal) + val fallbackFn: (Exception => MutateResponse) = { ex => + logger.error(s"mutation failed. $request", ex) + new IncrementResponse(false, -1L, -1L) + } + val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long => + new IncrementResponse(true, resultCount.longValue(), countVal) + }.toFuture(MutateResponse.IncrementFailure) + + if (withWait) future else Future.successful(MutateResponse.IncrementSuccess) + } + + /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ + val othersFutures = putAndDeletes.groupBy { kv => + (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) + }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => + + val durability = groupedKeyValues.head.durability + val qualifiers = new ArrayBuffer[Array[Byte]]() + val values = new ArrayBuffer[Array[Byte]]() + + groupedKeyValues.foreach { kv => + if (kv.qualifier != null) qualifiers += kv.qualifier + if (kv.value != null) values += kv.value + } + val defer = operation match { + case SKeyValue.Put => + val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp) + put.setDurable(durability) + _client.put(put) + case SKeyValue.Delete => + val delete = + if (qualifiers.isEmpty) + new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp) + else + new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp) + delete.setDurable(durability) + _client.delete(delete) + } + if (withWait) { + defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception => + groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) } + MutateResponse.Failure + } + } else Future.successful(MutateResponse.Success) + } + for { + incrementRets <- Future.sequence(incrementsFutures) + otherRets <- Future.sequence(othersFutures) + } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess)) + } + } + + /** + * write requestKeyValue into storage if the current value in storage that is stored matches. + * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. + * + * Most important thing is this have to be 'atomic' operation. + * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be + * either blocked or failed on write-write conflict case. + * + * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to + * prevent wrong data for read. + * + * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, + * compareAndSet to synchronize. + * + * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. + * for storage that does not support concurrency control, then storage implementation + * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) + * and write(writeLock). + * + * @param rpc + * @param expectedOpt + * @return + */ + override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = { + val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) + val expected = expectedOpt.map(_.value).getOrElse(Array.empty) + client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true) + .map(r => new MutateResponse(isSuccess = r)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 4be3767..f65ee20 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -21,25 +21,40 @@ package org.apache.s2graph.core.storage.hbase import java.util +import java.util.Base64 import java.util.concurrent.{ExecutorService, Executors} +import com.stumbleupon.async.Deferred import com.typesafe.config.Config import org.apache.commons.io.FileUtils +import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.storage.serde._ +import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.apache.s2graph.core.utils._ +import org.hbase.async.FilterList.Operator.MUST_PASS_ALL import org.hbase.async._ -import org.apache.s2graph.core.storage.serde._ + import scala.collection.JavaConversions._ +import scala.concurrent.{ExecutionContext, Future} object AsynchbaseStorage { + import Extensions.DeferOps + import CanDefer._ + val vertexCf = Serializable.vertexCf val edgeCf = Serializable.edgeCf + val emptyKVs = new util.ArrayList[KeyValue]() + val emptyKeyValues = new util.ArrayList[KeyValue]() + val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]() + val emptyStepResult = new util.ArrayList[StepResult]() AsynchbasePatcher.init() + def makeClient(config: Config, overrideKv: (String, String)*) = { val asyncConfig: org.hbase.async.Config = if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { @@ -135,6 +150,161 @@ object AsynchbaseStorage { hbaseExecutor } + + def fetchKeyValues(client: HBaseClient, rpc: AsyncRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + val defer = fetchKeyValuesInner(client, rpc) + defer.toFuture(emptyKeyValues).map { kvsArr => + kvsArr.map { kv => + implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + } + } + } + + def fetchKeyValuesInner(client: HBaseClient, rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = { + rpc match { + case Left(get) => client.get(get) + case Right(ScanWithRange(scanner, offset, limit)) => + val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex => + logger.error(s"fetchKeyValuesInner failed.", ex) + scanner.close() + emptyKeyValues + } + + scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs => + val ls = new util.ArrayList[KeyValue] + if (kvsLs == null) { + } else { + kvsLs.foreach { kvs => + if (kvs != null) kvs.foreach { kv => ls.add(kv) } + else { + + } + } + } + + scanner.close() + val toIndex = Math.min(ls.size(), offset + limit) + new util.ArrayList[KeyValue](ls.subList(offset, toIndex)) + } + case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) + } + } + + def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { + /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + hbaseRpc match { + case Left(getRequest) => getRequest.key + case Right(ScanWithRange(scanner, offset, limit)) => + Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))) + case _ => + logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") + throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") + } + } + + def buildRequest(serDe: StorageSerDe, queryRequest: QueryRequest, vertex: S2VertexLike) = { + val kvs = serDe.vertexSerializer(vertex).toKeyValues + val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) + // get.setTimeout(this.singleGetTimeout.toShort) + get.setFailfast(true) + get.maxVersions(1) + + Left(get) + } + + def buildRequest(client: HBaseClient, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike) = { + val queryParam = queryRequest.queryParam + val label = queryParam.label + + val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + val snapshotEdge = edge.toSnapshotEdge + serDe.snapshotEdgeSerializer(snapshotEdge) + } else { + val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq) + serDe.indexEdgeSerializer(indexEdge) + } + + val rowKey = serializer.toRowKey + val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue)) + + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + + label.schemaVersion match { + case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => + val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName) + scanner.setFamily(SKeyValue.EdgeCf) + + /* + * TODO: remove this part. + */ + val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq) + val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) + + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val (startKey, stopKey) = + if (queryParam.intervalOpt.isDefined) { + // interval is set. + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => Bytes.add(baseKey, intervalMaxBytes) + } + (_startKey , Bytes.add(baseKey, intervalMinBytes)) + } else { + /* + * note: since propsToBytes encode size of property map at first byte, we are sure about max value here + */ + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => baseKey + } + (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) + } + + scanner.setStartKey(startKey) + scanner.setStopKey(stopKey) + + if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") + + scanner.setMaxVersions(1) + // TODO: exclusive condition innerOffset with cursorOpt + if (queryParam.cursorOpt.isDefined) { + scanner.setMaxNumRows(queryParam.limit) + } else { + scanner.setMaxNumRows(queryParam.innerOffset + queryParam.innerLimit) + } + scanner.setMaxTimestamp(maxTs) + scanner.setMinTimestamp(minTs) + scanner.setRpcTimeout(queryParam.rpcTimeout) + + // SET option for this rpc properly. + if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit)) + else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit)) + + case _ => + val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf, serializer.toQualifier) + } else { + new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf) + } + + get.maxVersions(1) + get.setFailfast(true) + get.setMinTimestamp(minTs) + get.setMaxTimestamp(maxTs) + get.setTimeout(queryParam.rpcTimeout) + + val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset) + val columnRangeFilterOpt = queryParam.intervalOpt.map { interval => + new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true) + } + get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL)) + Left(get) + } + } } @@ -149,11 +319,21 @@ class AsynchbaseStorage(override val graph: S2GraphLike, val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") val clients = Seq(client, clientWithFlush) - override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients) +// private lazy val _fetcher = new AsynchbaseStorageFetcher(graph, config, client, serDe, io) + + private lazy val optimisticEdgeFetcher = new AsynchbaseOptimisticEdgeFetcher(client, serDe, io) + private lazy val optimisticMutator = new AsynchbaseOptimisticMutator(graph, serDe, optimisticEdgeFetcher, client, clientWithFlush) + private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator) + override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients) override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) - override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) + override val edgeFetcher: EdgeFetcher = new AsynchbaseEdgeFetcher(graph, config, client, serDe, io) + override val edgeBulkFetcher: EdgeBulkFetcher = new AsynchbaseEdgeBulkFetcher(graph, config, client, serDe, io) + override val vertexFetcher: VertexFetcher = new AsynchbaseVertexFetcher(graph, config, client, serDe, io) + override val vertexBulkFetcher: VertexBulkFetcher = new AsynchbaseVertexBulkFetcher(graph, config, client, serDe, io) + + override val edgeMutator: EdgeMutator = _mutator + override val vertexMutator: VertexMutator = _mutator - override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala deleted file mode 100644 index 0526042..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * - */ - -package org.apache.s2graph.core.storage.hbase - -import java.util -import java.util.Base64 - -import com.stumbleupon.async.Deferred -import com.typesafe.config.Config -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core._ -import org.apache.s2graph.core.schema.{Label, ServiceColumn} -import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.storage.serde._ -import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} -import org.apache.s2graph.core.types.{HBaseType, VertexId} -import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger} -import org.hbase.async.FilterList.Operator.MUST_PASS_ALL -import org.hbase.async._ - -import scala.collection.JavaConversions._ -import scala.concurrent.{ExecutionContext, Future} - -class AsynchbaseStorageReadable(val graph: S2GraphLike, - val config: Config, - val client: HBaseClient, - override val serDe: StorageSerDe, - override val io: StorageIO) extends StorageReadable { - import Extensions.DeferOps - import CanDefer._ - - private val emptyKeyValues = new util.ArrayList[KeyValue]() - private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]() - private val emptyStepResult = new util.ArrayList[StepResult]() - - /** Future Cache to squash request */ - lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true) - /** v4 max next row size */ - private val v4_max_num_rows = 10000 - private def getV4MaxNumRows(limit : Int): Int = { - if (limit < v4_max_num_rows) limit - else v4_max_num_rows - } - - /** - * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. - * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build - * client request(GetRequest, Scanner) based on user provided query. - * - * @param queryRequest - * @return - */ - private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike) = { - import Serializable._ - val queryParam = queryRequest.queryParam - val label = queryParam.label - - val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { - val snapshotEdge = edge.toSnapshotEdge - serDe.snapshotEdgeSerializer(snapshotEdge) - } else { - val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq) - serDe.indexEdgeSerializer(indexEdge) - } - - val rowKey = serializer.toRowKey - val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue)) - - val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) - - label.schemaVersion match { - case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => - val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName) - scanner.setFamily(edgeCf) - - /* - * TODO: remove this part. - */ - val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq) - val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) - - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - - val (startKey, stopKey) = - if (queryParam.intervalOpt.isDefined) { - // interval is set. - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Base64.getDecoder.decode(cursor) - case None => Bytes.add(baseKey, intervalMaxBytes) - } - (_startKey , Bytes.add(baseKey, intervalMinBytes)) - } else { - /* - * note: since propsToBytes encode size of property map at first byte, we are sure about max value here - */ - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Base64.getDecoder.decode(cursor) - case None => baseKey - } - (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) - } - - scanner.setStartKey(startKey) - scanner.setStopKey(stopKey) - - if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") - - scanner.setMaxVersions(1) - // TODO: exclusive condition innerOffset with cursorOpt - if (queryParam.cursorOpt.isDefined) { - scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit)) - } else { - scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit)) - } - scanner.setMaxTimestamp(maxTs) - scanner.setMinTimestamp(minTs) - scanner.setRpcTimeout(queryParam.rpcTimeout) - - // SET option for this rpc properly. - if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit)) - else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit)) - - case _ => - val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) { - new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier) - } else { - new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) - } - - get.maxVersions(1) - get.setFailfast(true) - get.setMinTimestamp(minTs) - get.setMaxTimestamp(maxTs) - get.setTimeout(queryParam.rpcTimeout) - - val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset) - val columnRangeFilterOpt = queryParam.intervalOpt.map { interval => - new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true) - } - get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL)) - Left(get) - } - } - - /** - * - * @param queryRequest - * @param vertex - * @return - */ - private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike) = { - val kvs = serDe.vertexSerializer(vertex).toKeyValues - val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) - // get.setTimeout(this.singleGetTimeout.toShort) - get.setFailfast(true) - get.maxVersions(1) - - Left(get) - } - - override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = { - val rpc = buildRequest(queryRequest, edge) - fetchKeyValues(rpc) - } - - override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = { - val rpc = buildRequest(queryRequest, vertex) - fetchKeyValues(rpc) - } - - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * - * @param queryRequests - * @param prevStepEdges - * @return - */ - override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = { - val defers: Seq[Deferred[StepResult]] = for { - queryRequest <- queryRequests - } yield { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent - val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil - fetch(queryRequest, isInnerCall = false, parentEdges) - } - - val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers) - grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] => - queryResults.toSeq - }.toFuture(emptyStepResult) - } - - def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = { - val defer = fetchKeyValuesInner(rpc) - defer.toFuture(emptyKeyValues).map { kvsArr => - kvsArr.map { kv => - implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - } - } - } - - override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = { - val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => - val distinctLabels = labels.toSet - val scan = AsynchbasePatcher.newScanner(client, hTableName) - scan.setFamily(Serializable.edgeCf) - scan.setMaxVersions(1) - - scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { - case null => Seq.empty - case kvsLs => - kvsLs.flatMap { kvs => - kvs.flatMap { kv => - val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - - serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION) - .fromKeyValues(Seq(kv), None) - .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) - } - } - } - } - - Future.sequence(futures).map(_.flatten) - } - - override def fetchVerticesAll()(implicit ec: ExecutionContext) = { - val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => - val distinctColumns = columns.toSet - val scan = AsynchbasePatcher.newScanner(client, hTableName) - scan.setFamily(Serializable.vertexCf) - scan.setMaxVersions(1) - - scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { - case null => Seq.empty - case kvsLs => - kvsLs.flatMap { kvs => - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs, None) - .filter(v => distinctColumns(v.serviceColumn)) - } - } - } - Future.sequence(futures).map(_.flatten) - } - - - /** - * we are using future cache to squash requests into same key on storage. - * - * @param queryRequest - * @param isInnerCall - * @param parentEdges - * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. - * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback - */ - private def fetch(queryRequest: QueryRequest, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = { - - def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = { - val prevStepScore = queryRequest.prevStepScore - val fallbackFn: (Exception => StepResult) = { ex => - logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) - StepResult.Failure - } - - val queryParam = queryRequest.queryParam - fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs => - val (startOffset, len) = queryParam.label.schemaVersion match { - case HBaseType.VERSION4 => - val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset - (offset, queryParam.limit) - case _ => (0, kvs.length) - } - - io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len) - } - } - - val queryParam = queryRequest.queryParam - val cacheTTL = queryParam.cacheTTLInMillis - /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ - - val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges) - val request = buildRequest(queryRequest, edge) - - val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) - val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) - - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey) - - // val cacheKeyBytes = toCacheKeyBytes(request) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) - } - } - - /** - * Private Methods which is specific to Asynchbase implementation. - */ - private def fetchKeyValuesInner(rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = { - rpc match { - case Left(get) => client.get(get) - case Right(ScanWithRange(scanner, offset, limit)) => - val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex => - logger.error(s"fetchKeyValuesInner failed.", ex) - scanner.close() - emptyKeyValues - } - - scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs => - val ls = new util.ArrayList[KeyValue] - if (kvsLs == null) { - } else { - kvsLs.foreach { kvs => - if (kvs != null) kvs.foreach { kv => ls.add(kv) } - else { - - } - } - } - - scanner.close() - val toIndex = Math.min(ls.size(), offset + limit) - new util.ArrayList[KeyValue](ls.subList(offset, toIndex)) - } - case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) - } - } - - private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { - /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ - hbaseRpc match { - case Left(getRequest) => getRequest.key - case Right(ScanWithRange(scanner, offset, limit)) => - Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))) - case _ => - logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") - throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala deleted file mode 100644 index b4236b9..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.hbase - -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.S2GraphLike -import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.utils.{Extensions, logger} -import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest} - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} - -class AsynchbaseStorageWritable(val graph: S2GraphLike, - val serDe: StorageSerDe, - val reader: StorageReadable, - val client: HBaseClient, - val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) { - import Extensions.DeferOps - - private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client - /** - * decide how to store given key values Seq[SKeyValue] into storage using storage's client. - * note that this should be return true on all success. - * we assumes that each storage implementation has client as member variable. - * - * @param cluster : where this key values should be stored. - * @param kvs : sequence of SKeyValue that need to be stored in storage. - * @param withWait : flag to control wait ack from storage. - * note that in AsynchbaseStorage(which support asynchronous operations), even with true, - * it never block thread, but rather submit work and notified by event loop when storage send ack back. - * @return ack message from storage. - */ - override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { - if (kvs.isEmpty) Future.successful(MutateResponse.Success) - else { - val _client = client(withWait) - val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) - - /* Asynchbase IncrementRequest does not implement HasQualifiers */ - val incrementsFutures = increments.map { kv => - val countVal = Bytes.toLong(kv.value) - val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal) - val fallbackFn: (Exception => MutateResponse) = { ex => - logger.error(s"mutation failed. $request", ex) - new IncrementResponse(false, -1L, -1L) - } - val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long => - new IncrementResponse(true, resultCount.longValue(), countVal) - }.toFuture(MutateResponse.IncrementFailure) - - if (withWait) future else Future.successful(MutateResponse.IncrementSuccess) - } - - /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ - val othersFutures = putAndDeletes.groupBy { kv => - (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) - }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => - - val durability = groupedKeyValues.head.durability - val qualifiers = new ArrayBuffer[Array[Byte]]() - val values = new ArrayBuffer[Array[Byte]]() - - groupedKeyValues.foreach { kv => - if (kv.qualifier != null) qualifiers += kv.qualifier - if (kv.value != null) values += kv.value - } - val defer = operation match { - case SKeyValue.Put => - val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp) - put.setDurable(durability) - _client.put(put) - case SKeyValue.Delete => - val delete = - if (qualifiers.isEmpty) - new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp) - else - new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp) - delete.setDurable(durability) - _client.delete(delete) - } - if (withWait) { - defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception => - groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) } - MutateResponse.Failure - } - } else Future.successful(MutateResponse.Success) - } - for { - incrementRets <- Future.sequence(incrementsFutures) - otherRets <- Future.sequence(othersFutures) - } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess)) - } - } - - /** - * write requestKeyValue into storage if the current value in storage that is stored matches. - * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. - * - * Most important thing is this have to be 'atomic' operation. - * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be - * either blocked or failed on write-write conflict case. - * - * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to - * prevent wrong data for read. - * - * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, - * compareAndSet to synchronize. - * - * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. - * for storage that does not support concurrency control, then storage implementation - * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) - * and write(writeLock). - * - * @param rpc - * @param expectedOpt - * @return - */ - override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = { - val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) - val expected = expectedOpt.map(_.value).getOrElse(Array.empty) - client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true) - .map(r => new MutateResponse(isSuccess = r)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala new file mode 100644 index 0000000..e6bf4e6 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.hbase + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema.ServiceColumn +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.core.utils.Extensions +import org.apache.s2graph.core.{S2Graph, S2GraphLike, VertexBulkFetcher} +import org.hbase.async.HBaseClient + +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseVertexBulkFetcher(val graph: S2GraphLike, + val config: Config, + val client: HBaseClient, + val serDe: StorageSerDe, + val io: StorageIO) extends VertexBulkFetcher { + + import AsynchbaseStorage._ + import Extensions.DeferOps + + import scala.collection.JavaConverters._ + + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => + val distinctColumns = columns.toSet + val scan = AsynchbasePatcher.newScanner(client, hTableName) + scan.setFamily(Serializable.vertexCf) + scan.setMaxVersions(1) + + scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { + case null => Seq.empty + case kvsLs => + kvsLs.asScala.flatMap { kvs => + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None) + .filter(v => distinctColumns(v.serviceColumn)) + } + } + } + Future.sequence(futures).map(_.flatten) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala new file mode 100644 index 0000000..560dd2b --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.hbase + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.hbase.async.HBaseClient + +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseVertexFetcher(val graph: S2GraphLike, + val config: Config, + val client: HBaseClient, + val serDe: StorageSerDe, + val io: StorageIO) extends VertexFetcher { + import AsynchbaseStorage._ + + private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + val rpc = buildRequest(serDe, queryRequest, vertex) + AsynchbaseStorage.fetchKeyValues(client, rpc) + } + + override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { + def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { + if (kvs.isEmpty) Nil + else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq + } + + val futures = vertices.map { vertex => + val queryParam = QueryParam.Empty + val q = Query.toQuery(Seq(vertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + + fetchKeyValues(queryRequest, vertex).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) + } recoverWith { + case ex: Throwable => Future.successful(Nil) + } + } + + Future.sequence(futures).map(_.flatten) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala new file mode 100644 index 0000000..2ca4b35 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.rocks + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2GraphLike} +import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.HBaseType +import org.rocksdb.RocksDB + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +class RocksEdgeBulkFetcher(val graph: S2GraphLike, + val config: Config, + val db: RocksDB, + val vdb: RocksDB, + val serDe: StorageSerDe, + val io: StorageIO) extends EdgeBulkFetcher { + import RocksStorage._ + + override def fetchEdgesAll()(implicit ec: ExecutionContext) = { + val edges = new ArrayBuffer[S2EdgeLike]() + Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) => + val distinctLabels = labels.toSet + + val iter = db.newIterator() + try { + iter.seekToFirst() + while (iter.isValid) { + val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) + + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) + .foreach { edge => + edges += edge + } + + + iter.next() + } + + } finally { + iter.close() + } + } + + Future.successful(edges) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala new file mode 100644 index 0000000..628c5e1 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.rocks + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.VertexId +import org.rocksdb.RocksDB + +import scala.concurrent.{ExecutionContext, Future} + +class RocksEdgeFetcher(val graph: S2GraphLike, + val config: Config, + val db: RocksDB, + val vdb: RocksDB, + val serDe: StorageSerDe, + val io: StorageIO) extends EdgeFetcher { + import RocksStorage._ + + override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + val futures = for { + queryRequest <- queryRequests + } yield { + val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) + val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges) + val rpc = buildRequest(graph, serDe, queryRequest, edge) + fetchKeyValues(vdb, db, rpc).map { kvs => + val queryParam = queryRequest.queryParam + val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges) + val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore => + val edge = edgeWithScore.edge + val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue)) + edge.ts >= duration._1 && edge.ts < duration._2 + } + + stepResult.copy(edgeWithScores = edgeWithScores) + } + } + + Future.sequence(futures) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala new file mode 100644 index 0000000..6513442 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.rocks + +import com.typesafe.config.Config +import org.apache.s2graph.core.{QueryRequest, S2EdgeLike, S2GraphLike} +import org.apache.s2graph.core.storage.{OptimisticEdgeFetcher, SKeyValue, StorageIO, StorageSerDe} +import org.rocksdb.RocksDB + +import scala.concurrent.{ExecutionContext, Future} + +class RocksOptimisticEdgeFetcher(val graph: S2GraphLike, + val config: Config, + val db: RocksDB, + val vdb: RocksDB, + val serDe: StorageSerDe, + val io: StorageIO) extends OptimisticEdgeFetcher { + + override protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + val request = RocksStorage.buildRequest(graph, serDe, queryRequest, edge) + + RocksStorage.fetchKeyValues(vdb, db, request) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala new file mode 100644 index 0000000..ec77d3f --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.rocks + +import java.util.concurrent.locks.ReentrantLock + +import com.google.common.cache.{Cache, LoadingCache} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.S2GraphLike +import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.utils.logger +import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions} + +import scala.concurrent.{ExecutionContext, Future} + +class RocksOptimisticMutator(val graph: S2GraphLike, + val serDe: StorageSerDe, + val optimisticEdgeFetcher: OptimisticEdgeFetcher, + val db: RocksDB, + val vdb: RocksDB, + val lockMap: LoadingCache[String, ReentrantLock]) extends OptimisticMutator { + + override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { + if (kvs.isEmpty) { + Future.successful(MutateResponse.Success) + } else { + val ret = { + val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf)) + val writeBatchV = buildWriteBatch(kvsV) + val writeBatchE = buildWriteBatch(kvsE) + val writeOptions = new WriteOptions + try { + vdb.write(writeOptions, writeBatchV) + db.write(writeOptions, writeBatchE) + true + } catch { + case e: Exception => + logger.error(s"writeAsyncSimple failed.", e) + false + } finally { + writeBatchV.close() + writeBatchE.close() + writeOptions.close() + } + } + + Future.successful(new MutateResponse(ret)) + } + } + + + override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = { + def op = { + val writeOptions = new WriteOptions + try { + val fetchedValue = db.get(requestKeyValue.row) + val innerRet = expectedOpt match { + case None => + if (fetchedValue == null) { + + db.put(writeOptions, requestKeyValue.row, requestKeyValue.value) + true + } else { + false + } + case Some(kv) => + if (fetchedValue == null) { + false + } else { + if (Bytes.compareTo(fetchedValue, kv.value) == 0) { + db.put(writeOptions, requestKeyValue.row, requestKeyValue.value) + true + } else { + false + } + } + } + + Future.successful(new MutateResponse(innerRet)) + } catch { + case e: RocksDBException => + logger.error(s"Write lock failed", e) + Future.successful(MutateResponse.Failure) + } finally { + writeOptions.close() + } + } + + withLock(requestKeyValue.row)(op) + } + + private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = { + val writeBatch = new WriteBatch() + kvs.foreach { kv => + kv.operation match { + case SKeyValue.Put => writeBatch.put(kv.row, kv.value) + case SKeyValue.Delete => writeBatch.remove(kv.row) + case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value) + case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}") + } + } + writeBatch + } + + private def withLock[A](key: Array[Byte])(op: => A): A = { + val lockKey = Bytes.toString(key) + val lock = lockMap.get(lockKey) + + try { + lock.lock + op + } finally { + lock.unlock() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala index b24e375..8948e13 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala @@ -19,22 +19,30 @@ package org.apache.s2graph.core.storage.rocks +import java.util.Base64 import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.hash.Hashing import com.typesafe.config.Config +import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe} -import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC +import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange} +import org.apache.s2graph.core.storage.serde.StorageSerializable +import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.utils.logger import org.rocksdb._ import org.rocksdb.util.SizeUnit +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Random, Try} object RocksStorage { + val table = Array.emptyByteArray + val qualifier = Array.emptyByteArray RocksDB.loadLibrary() @@ -129,6 +137,84 @@ object RocksStorage { throw e } } + + def buildRequest(graph: S2GraphLike, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = { + queryRequest.queryParam.tgtVertexInnerIdOpt match { + case None => // indexEdges + val queryParam = queryRequest.queryParam + val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption + val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + val (startKey, stopKey) = + if (queryParam.intervalOpt.isDefined) { + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => Bytes.add(baseKey, intervalMaxBytes) + } + (_startKey, Bytes.add(baseKey, intervalMinBytes)) + } else { + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => baseKey + } + (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) + } + + Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit)) + + case Some(tgtId) => // snapshotEdge + val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head + Left(GetRequest(SKeyValue.EdgeCf, kv.row)) + } + } + + def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = { + val startKey = vertex.id.bytes + val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue)) + + Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue)) + } + + def fetchKeyValues(vdb: RocksDB, db: RocksDB, rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + rpc match { + case Left(GetRequest(cf, key)) => + val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db + val v = _db.get(key) + + val kvs = + if (v == null) Seq.empty + else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis())) + + Future.successful(kvs) + case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) => + val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db + val kvs = new ArrayBuffer[SKeyValue]() + val iter = _db.newIterator() + + try { + var idx = 0 + iter.seek(startKey) + val (startOffset, len) = (offset, limit) + while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) { + if (idx >= startOffset) { + kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis()) + } + + iter.next() + idx += 1 + } + } finally { + iter.close() + } + Future.successful(kvs) + } + } } class RocksStorage(override val graph: S2GraphLike, @@ -150,12 +236,18 @@ class RocksStorage(override val graph: S2GraphLike, .maximumSize(1000 * 10 * 10 * 10 * 10) .build[String, ReentrantLock](cacheLoader) - override val management: StorageManagement = new RocksStorageManagement(config, vdb, db) + private lazy val optimisticEdgeFetcher = new RocksOptimisticEdgeFetcher(graph, config, db, vdb, serDe, io) + private lazy val optimisticMutator = new RocksOptimisticMutator(graph, serDe, optimisticEdgeFetcher, db, vdb, lockMap) + private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator) + override val management: StorageManagement = new RocksStorageManagement(config, vdb, db) override val serDe: StorageSerDe = new RocksStorageSerDe(graph) - override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io) - - override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap) + override val edgeFetcher: EdgeFetcher = new RocksEdgeFetcher(graph, config, db, vdb, serDe, io) + override val edgeBulkFetcher: EdgeBulkFetcher = new RocksEdgeBulkFetcher(graph, config, db, vdb, serDe, io) + override val vertexFetcher: VertexFetcher = new RocksVertexFetcher(graph, config, db, vdb, serDe, io) + override val vertexBulkFetcher: VertexBulkFetcher = new RocksVertexBulkFetcher(graph, config, db, vdb, serDe, io) + override val edgeMutator: EdgeMutator = _mutator + override val vertexMutator: VertexMutator = _mutator } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala deleted file mode 100644 index 27e3efd..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.rocks - -import java.util.Base64 - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core._ -import org.apache.s2graph.core.schema.{Label, ServiceColumn} -import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange} -import org.apache.s2graph.core.storage.serde.StorageSerializable -import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.types.{HBaseType, VertexId} -import org.rocksdb.RocksDB - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} - -class RocksStorageReadable(val graph: S2GraphLike, - val config: Config, - val db: RocksDB, - val vdb: RocksDB, - val serDe: StorageSerDe, - override val io: StorageIO) extends StorageReadable { - - private val table = Array.emptyByteArray - private val qualifier = Array.emptyByteArray - - private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = { - queryRequest.queryParam.tgtVertexInnerIdOpt match { - case None => // indexEdges - val queryParam = queryRequest.queryParam - val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption - val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - - val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - - val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) - val (startKey, stopKey) = - if (queryParam.intervalOpt.isDefined) { - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Base64.getDecoder.decode(cursor) - case None => Bytes.add(baseKey, intervalMaxBytes) - } - (_startKey, Bytes.add(baseKey, intervalMinBytes)) - } else { - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Base64.getDecoder.decode(cursor) - case None => baseKey - } - (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) - } - - Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit)) - - case Some(tgtId) => // snapshotEdge - val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head - Left(GetRequest(SKeyValue.EdgeCf, kv.row)) - } - } - - private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = { - val startKey = vertex.id.bytes - val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue)) - - Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue)) -// val kv = serDe.vertexSerializer(vertex).toKeyValues.head -// Left(GetRequest(SKeyValue.VertexCf, kv.row)) - } - - override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { - val futures = for { - queryRequest <- queryRequests - } yield { - val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) - val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges) - val rpc = buildRequest(queryRequest, edge) - fetchKeyValues(rpc).map { kvs => - val queryParam = queryRequest.queryParam - val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges) - val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore => - val edge = edgeWithScore.edge - val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue)) - edge.ts >= duration._1 && edge.ts < duration._2 - } - - stepResult.copy(edgeWithScores = edgeWithScores) - } - } - - Future.sequence(futures) - } - - private def fetchKeyValues(rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { - rpc match { - case Left(GetRequest(cf, key)) => - val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db - val v = _db.get(key) - - val kvs = - if (v == null) Seq.empty - else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis())) - - Future.successful(kvs) - case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) => - val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db - val kvs = new ArrayBuffer[SKeyValue]() - val iter = _db.newIterator() - - try { - var idx = 0 - iter.seek(startKey) - val (startOffset, len) = (offset, limit) - while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) { - if (idx >= startOffset) { - kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis()) - } - - iter.next() - idx += 1 - } - } finally { - iter.close() - } - Future.successful(kvs) - } - } - - override def fetchEdgesAll()(implicit ec: ExecutionContext) = { - val edges = new ArrayBuffer[S2EdgeLike]() - Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) => - val distinctLabels = labels.toSet - - val iter = db.newIterator() - try { - iter.seekToFirst() - while (iter.isValid) { - val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) - - serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) - .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) - .foreach { edge => - edges += edge - } - - - iter.next() - } - - } finally { - iter.close() - } - } - - Future.successful(edges) - } - - override def fetchVerticesAll()(implicit ec: ExecutionContext) = { - import scala.collection.mutable - - val vertices = new ArrayBuffer[S2VertexLike]() - ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) => - val distinctColumns = columns.toSet - - val iter = vdb.newIterator() - val buffer = mutable.ListBuffer.empty[SKeyValue] - var oldVertexIdBytes = Array.empty[Byte] - var minusPos = 0 - - try { - iter.seekToFirst() - while (iter.isValid) { - val row = iter.key() - if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) { - if (buffer.nonEmpty) - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) - .filter(v => distinctColumns(v.serviceColumn)) - .foreach { vertex => - vertices += vertex - } - - oldVertexIdBytes = row - minusPos = 1 - buffer.clear() - } - val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis()) - buffer += kv - - iter.next() - } - if (buffer.nonEmpty) - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) - .filter(v => distinctColumns(v.serviceColumn)) - .foreach { vertex => - vertices += vertex - } - - } finally { - iter.close() - } - } - - Future.successful(vertices) - } - - override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = { - fetchKeyValues(buildRequest(queryRequest, edge)) - } - - override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = { - fetchKeyValues(buildRequest(queryRequest, vertex)) - } -}
