remove type parameter on Storage Trait (#13)
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/55d194ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/55d194ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/55d194ed Branch: refs/heads/master Commit: 55d194ed8edfb0ec78ce5723c379c20fa7462632 Parents: f416194 Author: daewon <[email protected]> Authored: Mon Oct 30 17:07:48 2017 +0900 Committer: Doyung Yoon <[email protected]> Committed: Mon Oct 30 10:07:48 2017 +0200 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/core/S2Graph.scala | 54 ++++++-------------- .../apache/s2graph/core/storage/SKeyValue.scala | 13 +++-- .../apache/s2graph/core/storage/Storage.scala | 16 ++---- .../s2graph/core/storage/StorageReadable.scala | 30 +++++------ .../storage/WriteWriteConflictResolver.scala | 6 +-- .../core/storage/hbase/AsynchbaseStorage.scala | 5 +- .../hbase/AsynchbaseStorageReadable.scala | 46 ++++++++++++++--- 7 files changed, 84 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 92f68dc..34db9e4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -133,7 +133,7 @@ object S2Graph { new S2Graph(configuration)(ec) } - def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_] = { + def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage = { val storageBackend = config.getString("s2graph.storage.backend") logger.info(s"[InitStorage]: $storageBackend") @@ -908,7 +908,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph /** * TODO: we need to some way to handle malformed configuration for storage. */ - val storagePool: scala.collection.mutable.Map[String, Storage[_]] = { + val storagePool: scala.collection.mutable.Map[String, Storage] = { val labels = Label.findAll() val services = Service.findAll() @@ -919,12 +919,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph confWithFallback(conf) }.toSet - val pools = new java.util.HashMap[Config, Storage[_]]() + val pools = new java.util.HashMap[Config, Storage]() configs.foreach { config => pools.put(config, S2Graph.initStorage(this, config)(ec)) } - val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_]]() + val m = new java.util.concurrent.ConcurrentHashMap[String, Storage]() labels.foreach { label => if (label.storageConfigOpt.isDefined) { @@ -941,7 +941,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph m } - val defaultStorage: Storage[_] = S2Graph.initStorage(this, config)(ec) + val defaultStorage: Storage = S2Graph.initStorage(this, config)(ec) /** QueryLevel FutureCache */ val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty) @@ -953,11 +953,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val indexProvider = IndexProvider.apply(config) - def getStorage(service: Service): Storage[_] = { + def getStorage(service: Service): Storage = { storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) } - def getStorage(label: Label): Storage[_] = { + def getStorage(label: Label): Storage = { storagePool.getOrElse(s"label:${label.label}", defaultStorage) } @@ -975,7 +975,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val futures = for { edge <- edges } yield { - getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (_, edgeOpt, _) => + getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) => edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel)) } } @@ -1145,31 +1145,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { - def getVertices[Q](storage: Storage[Q])(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { - def fromResult(kvs: Seq[SKeyValue], - version: String): Option[S2Vertex] = { - if (kvs.isEmpty) None - else storage.vertexDeserializer(version).fromKeyValues(kvs, None) - // .map(S2Vertex(graph, _)) - } - - val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - val rpc = storage.buildRequest(queryRequest, vertex) - storage.fetchKeyValues(rpc).map { kvs => - fromResult(kvs, vertex.serviceColumn.schemaVersion) - } recoverWith { case ex: Throwable => - Future.successful(None) - } - } - - Future.sequence(futures).map { result => result.toList.flatten } - } val verticesWithIdx = vertices.zipWithIndex val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getVertices(getStorage(service))(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2))) + getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2))) } Future.sequence(futures).map { ls => @@ -1272,7 +1250,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - private def deleteAllFetchedEdgesAsyncOld(storage: Storage[_])(stepInnerResult: StepResult, + private def deleteAllFetchedEdgesAsyncOld(storage: Storage)(stepInnerResult: StepResult, requestTs: Long, retryNum: Int): Future[Boolean] = { if (stepInnerResult.isEmpty) Future.successful(true) @@ -1426,7 +1404,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - private def mutateStrongEdges(storage: Storage[_])(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = { + private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = { val edgeWithIdxs = _edges.zipWithIndex val grouped = edgeWithIdxs.groupBy { case (edge, idx) => @@ -1462,7 +1440,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } - private def mutateEdgesInner(storage: Storage[_])(edges: Seq[S2Edge], + private def mutateEdgesInner(storage: Storage)(edges: Seq[S2Edge], checkConsistency: Boolean, withWait: Boolean): Future[MutateResponse] = { assert(edges.nonEmpty) @@ -1483,14 +1461,14 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } } else { - storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) } } } def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = { - def mutateVertex(storage: Storage[_])(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = { + def mutateVertex(storage: Storage)(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = { if (vertex.op == GraphUtil.operations("delete")) { storage.writeToStorage(vertex.hbaseZkAddr, storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) @@ -1502,7 +1480,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - def mutateVertices(storage: Storage[_])(vertices: Seq[S2Vertex], + def mutateVertices(storage: Storage)(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) } Future.sequence(futures) @@ -1518,7 +1496,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { - def incrementCounts(storage: Storage[_])(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { + def incrementCounts(storage: Storage)(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { val futures = for { edge <- edges } yield { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index 924d9a3..775afda 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -56,18 +56,17 @@ trait CanSKeyValue[T] { } object CanSKeyValue { + def instance[T](f: T => SKeyValue): CanSKeyValue[T] = new CanSKeyValue[T] { + override def toSKeyValue(from: T): SKeyValue = f.apply(from) + } // For asyncbase KeyValues - implicit val asyncKeyValue = new CanSKeyValue[KeyValue] { - def toSKeyValue(kv: KeyValue): SKeyValue = { - SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp()) - } + implicit val asyncKeyValue = instance[KeyValue] { kv => + SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp()) } // For asyncbase KeyValues - implicit val sKeyValue = new CanSKeyValue[SKeyValue] { - def toSKeyValue(kv: SKeyValue): SKeyValue = kv - } + implicit val sKeyValue = instance[SKeyValue](identity) // For hbase KeyValues } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 2fe6e42..e4eafbf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -27,7 +27,7 @@ import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializa import org.apache.s2graph.core.types._ import scala.concurrent.{ExecutionContext, Future} -abstract class Storage[Q](val graph: S2Graph, +abstract class Storage(val graph: S2Graph, val config: Config) { /* Storage backend specific resource management */ val management: StorageManagement @@ -39,7 +39,7 @@ abstract class Storage[Q](val graph: S2Graph, * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage * then convert them into Edge/Vertex */ - val fetcher: StorageReadable[Q] + val fetcher: StorageReadable /* * Serialize Edge/Vertex, to common KeyValue, SKeyValue that @@ -61,7 +61,6 @@ abstract class Storage[Q](val graph: S2Graph, */ lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher) - /** IO **/ def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] = serDe.snapshotEdgeSerializer(snapshotEdge) @@ -115,22 +114,18 @@ abstract class Storage[Q](val graph: S2Graph, mutator.writeLock(requestKeyValue, expectedOpt) /** Fetch **/ - - def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q = fetcher.buildRequest(queryRequest, edge) - - def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q = fetcher.buildRequest(queryRequest, vertex) - def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = fetcher.fetches(queryRequests, prevStepEdges) - def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = fetcher.fetchKeyValues(rpc) + def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = + fetcher.fetchVertices(vertices) def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll() def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = fetcher.fetchVerticesAll() - def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = + def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = fetcher.fetchSnapshotEdgeInner(edge) /** Conflict Resolver **/ @@ -149,6 +144,5 @@ abstract class Storage[Q](val graph: S2Graph, def shutdown(): Unit = management.shutdown() - def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index 7a0d8ef..03b01fd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -26,21 +26,9 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} -trait StorageReadable[Q] { +trait StorageReadable { val io: StorageIO - /** - * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. - * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build - * client request(GetRequest, Scanner) based on user provided query. - * - * @param queryRequest - * @return - */ - def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q - - def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q - - /** + /** * responsible to fire parallel fetch call into storage and create future that will return merged result. * * @param queryRequests @@ -50,13 +38,18 @@ trait StorageReadable[Q] { def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] - def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] +// private def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] - def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { + protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + + protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + + def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = { val queryParam = QueryParam(labelName = edge.innerLabel.label, direction = GraphUtil.fromDirection(edge.labelWithDir.dir), tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), @@ -64,15 +57,16 @@ trait StorageReadable[Q] { val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) - fetchKeyValues(buildRequest(queryRequest, edge)).map { kvs => + fetchKeyValues(queryRequest, edge).map { kvs => val (edgeOpt, kvOpt) = if (kvs.isEmpty) (None, None) else { + import CanSKeyValue._ val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) val _kvOpt = kvs.headOption (snapshotEdgeOpt, _kvOpt) } - (queryParam, edgeOpt, kvOpt) + (edgeOpt, kvOpt) } recoverWith { case ex: Throwable => logger.error(s"fetchQueryParam failed. fallback return.", ex) throw new FetchTimeoutException(s"${edge.toLogString}") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index 79b764d..227cfa7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -32,7 +32,7 @@ class WriteWriteConflictResolver(graph: S2Graph, serDe: StorageSerDe, io: StorageIO, mutator: StorageWritable, - fetcher: StorageReadable[_]) { + fetcher: StorageReadable) { val BackoffTimeout = graph.BackoffTimeout val MaxRetryNum = graph.MaxRetryNum @@ -69,7 +69,7 @@ class WriteWriteConflictResolver(graph: S2Graph, case FetchTimeoutException(retryEdge) => logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") /* fetch failed. re-fetch should be done */ - fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } @@ -91,7 +91,7 @@ class WriteWriteConflictResolver(graph: S2Graph, val future = if (failedStatusCode == 0) { // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. /* fetch failed. re-fetch should be done */ - fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } } else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 54007d5..ef1350a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -27,7 +27,6 @@ import com.typesafe.config.Config import org.apache.commons.io.FileUtils import org.apache.s2graph.core._ import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.AsyncRPC import org.apache.s2graph.core.utils._ import org.hbase.async._ import org.apache.s2graph.core.storage.serde._ @@ -140,7 +139,7 @@ object AsynchbaseStorage { class AsynchbaseStorage(override val graph: S2Graph, - override val config: Config) extends Storage[AsyncRPC](graph, config) { + override val config: Config) extends Storage(graph, config) { /** * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase. @@ -156,7 +155,7 @@ class AsynchbaseStorage(override val graph: S2Graph, override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) - override val fetcher: StorageReadable[AsyncRPC] = new AsynchbaseStorageReadable(graph, config, client, serDe, io) + override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) // val hbaseExecutor: ExecutorService = // if (config.getString("hbase.zookeeper.quorum") == "localhost") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index 1cb6109..0dc8491 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -14,7 +14,7 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * */ package org.apache.s2graph.core.storage.hbase @@ -42,7 +42,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, val config: Config, val client: HBaseClient, val serDe: StorageSerDe, - override val io: StorageIO) extends StorageReadable[AsyncRPC] { + override val io: StorageIO) extends StorageReadable { import Extensions.DeferOps import CanDefer._ @@ -67,7 +67,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, * @param queryRequest * @return */ - override def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = { + private def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = { import Serializable._ val queryParam = queryRequest.queryParam val label = queryParam.label @@ -168,15 +168,26 @@ class AsynchbaseStorageReadable(val graph: S2Graph, * @param vertex * @return */ - override def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = { + private def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = { val kvs = serDe.vertexSerializer(vertex).toKeyValues val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) // get.setTimeout(this.singleGetTimeout.toShort) get.setFailfast(true) get.maxVersions(1) + Left(get) } + override def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext) = { + val rpc = buildRequest(queryRequest, edge) + fetchKeyValues(rpc) + } + + override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext) = { + val rpc = buildRequest(queryRequest, vertex) + fetchKeyValues(rpc) + } + /** * responsible to fire parallel fetch call into storage and create future that will return merged result. * @@ -201,7 +212,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, }.toFuture(emptyStepResult) } - override def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = { + def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = { val defer = fetchKeyValuesInner(rpc) defer.toFuture(emptyKeyValues).map { kvsArr => kvsArr.map { kv => @@ -224,7 +235,8 @@ class AsynchbaseStorageReadable(val graph: S2Graph, kvs.flatMap { kv => val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION) + .fromKeyValues(Seq(kv), None) .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree) } } @@ -234,6 +246,27 @@ class AsynchbaseStorageReadable(val graph: S2Graph, Future.sequence(futures).map(_.flatten) } + override def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext) = { + def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = { + if (kvs.isEmpty) Nil + else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq + } + + val futures = vertices.map { vertex => + val queryParam = QueryParam.Empty + val q = Query.toQuery(Seq(vertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + + fetchKeyValues(queryRequest, vertex).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) + } recoverWith { + case ex: Throwable => Future.successful(Nil) + } + } + + Future.sequence(futures).map(_.flatten) + } + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => val distinctColumns = columns.toSet @@ -351,4 +384,5 @@ class AsynchbaseStorageReadable(val graph: S2Graph, throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") } } + }
