provide default implementation on fetchVertices at StorageReadable.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/af9c1053
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/af9c1053
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/af9c1053

Branch: refs/heads/master
Commit: af9c1053fcf9f0392290636c78c014e009edc589
Parents: 55d194e
Author: DO YUNG YOON <[email protected]>
Authored: Mon Oct 30 18:28:51 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Oct 30 18:28:51 2017 +0900

----------------------------------------------------------------------
 .../s2graph/core/storage/StorageReadable.scala  | 26 +++++++++++++++++---
 .../hbase/AsynchbaseStorageReadable.scala       | 21 ----------------
 2 files changed, 23 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/af9c1053/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 03b01fd..052ca69 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -27,6 +27,7 @@ import org.apache.s2graph.core.utils.logger
 import scala.concurrent.{ExecutionContext, Future}
 
 trait StorageReadable {
+  val serDe: StorageSerDe
   val io: StorageIO
  /**
     * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
@@ -38,9 +39,6 @@ trait StorageReadable {
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
 
-// private def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): 
Future[Seq[SKeyValue]]
-  def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): 
Future[Seq[S2Vertex]]
-
   def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]]
 
   def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]]
@@ -49,6 +47,7 @@ trait StorageReadable {
 
   protected def fetchKeyValues(queryRequest: QueryRequest, vertex: 
S2Vertex)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
 
+
   def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): 
Future[(Option[S2Edge], Option[SKeyValue])] = {
     val queryParam = QueryParam(labelName = edge.innerLabel.label,
       direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
@@ -72,4 +71,25 @@ trait StorageReadable {
       throw new FetchTimeoutException(s"${edge.toLogString}")
     }
   }
+
+  def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): 
Future[Seq[S2Vertex]] = {
+    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = {
+      if (kvs.isEmpty) Nil
+      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+    }
+
+    val futures = vertices.map { vertex =>
+      val queryParam = QueryParam.Empty
+      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+      fetchKeyValues(queryRequest, vertex).map { kvs =>
+        fromResult(kvs, vertex.serviceColumn.schemaVersion)
+      } recoverWith {
+        case ex: Throwable => Future.successful(Nil)
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/af9c1053/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 0dc8491..8ff0ee0 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -246,27 +246,6 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
     Future.sequence(futures).map(_.flatten)
   }
 
-  override def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: 
ExecutionContext) = {
-    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = {
-      if (kvs.isEmpty) Nil
-      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
-    }
-
-    val futures = vertices.map { vertex =>
-      val queryParam = QueryParam.Empty
-      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
-      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-
-      fetchKeyValues(queryRequest, vertex).map { kvs =>
-        fromResult(kvs, vertex.serviceColumn.schemaVersion)
-      } recoverWith {
-        case ex: Throwable => Future.successful(Nil)
-      }
-    }
-
-    Future.sequence(futures).map(_.flatten)
-  }
-
   override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
     val futures = 
ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case 
(hTableName, columns) =>
       val distinctColumns = columns.toSet

Reply via email to