provide default implementation on fetchVertices at StorageReadable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/af9c1053 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/af9c1053 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/af9c1053 Branch: refs/heads/master Commit: af9c1053fcf9f0392290636c78c014e009edc589 Parents: 55d194e Author: DO YUNG YOON <[email protected]> Authored: Mon Oct 30 18:28:51 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Oct 30 18:28:51 2017 +0900 ---------------------------------------------------------------------- .../s2graph/core/storage/StorageReadable.scala | 26 +++++++++++++++++--- .../hbase/AsynchbaseStorageReadable.scala | 21 ---------------- 2 files changed, 23 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/af9c1053/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index 03b01fd..052ca69 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -27,6 +27,7 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} trait StorageReadable { + val serDe: StorageSerDe val io: StorageIO /** * responsible to fire parallel fetch call into storage and create future that will return merged result. @@ -38,9 +39,6 @@ trait StorageReadable { def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] -// private def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] - def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] @@ -49,6 +47,7 @@ trait StorageReadable { protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = { val queryParam = QueryParam(labelName = edge.innerLabel.label, direction = GraphUtil.fromDirection(edge.labelWithDir.dir), @@ -72,4 +71,25 @@ trait StorageReadable { throw new FetchTimeoutException(s"${edge.toLogString}") } } + + def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = { + def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = { + if (kvs.isEmpty) Nil + else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq + } + + val futures = vertices.map { vertex => + val queryParam = QueryParam.Empty + val q = Query.toQuery(Seq(vertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + + fetchKeyValues(queryRequest, vertex).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) + } recoverWith { + case ex: Throwable => Future.successful(Nil) + } + } + + Future.sequence(futures).map(_.flatten) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/af9c1053/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index 0dc8491..8ff0ee0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -246,27 +246,6 @@ class AsynchbaseStorageReadable(val graph: S2Graph, Future.sequence(futures).map(_.flatten) } - override def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext) = { - def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = { - if (kvs.isEmpty) Nil - else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq - } - - val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - - fetchKeyValues(queryRequest, vertex).map { kvs => - fromResult(kvs, vertex.serviceColumn.schemaVersion) - } recoverWith { - case ex: Throwable => Future.successful(Nil) - } - } - - Future.sequence(futures).map(_.flatten) - } - override def fetchVerticesAll()(implicit ec: ExecutionContext) = { val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => val distinctColumns = columns.toSet
