- Refactor WhereParser to accept GraphElement. - Refactor S2Graph.getVertices to accept VertexQueryParam.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f47622fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f47622fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f47622fd Branch: refs/heads/master Commit: f47622fdeaee9fef9d46bf83c6191204dad00043 Parents: 3b692fb Author: DO YUNG YOON <[email protected]> Authored: Fri Jun 15 15:35:16 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Jun 15 15:35:16 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/QueryParam.scala | 17 ++++++++++------ .../scala/org/apache/s2graph/core/S2Graph.scala | 19 +++++++++--------- .../org/apache/s2graph/core/S2GraphLike.scala | 20 +++++++++---------- .../org/apache/s2graph/core/VertexFetcher.scala | 2 +- .../apache/s2graph/core/rest/RestHandler.scala | 7 ++++--- .../core/storage/hbase/AsynchbaseStorage.scala | 2 +- .../storage/hbase/AsynchbaseVertexFetcher.scala | 21 ++++++++++---------- .../core/storage/rocks/RocksVertexFetcher.scala | 5 +++-- .../s2graph/graphql/types/FieldResolver.scala | 4 ++-- 9 files changed, 52 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 21aca12..cb1434f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -294,14 +294,19 @@ object QueryParam { } object VertexQueryParam { - def Empty: VertexQueryParam = VertexQueryParam(0, 1, None) + def Empty: VertexQueryParam = VertexQueryParam(Nil, 0, 1, None) + + def apply(vertexIds: Seq[VertexId]): VertexQueryParam = { + VertexQueryParam(vertexIds) + } } -case class VertexQueryParam(offset: Int, - limit: Int, - searchString: Option[String], - vertexIds: Seq[VertexId] = Nil, - fetchProp: Boolean = true) { +case class VertexQueryParam(vertexIds: Seq[VertexId], + offset: Int = 0, + limit: Int = 1, + searchString: Option[String] = None, + fetchProp: Boolean = true, + where: Try[Where] = Success(WhereParser.success)) { } case class QueryParam(labelName: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index d41bc24..f3010f7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -335,18 +335,19 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } def searchVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = { - val matchedVertices = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids => + val matchedVerticesFuture = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids => (queryParam.vertexIds ++ vids).distinct.map(vid => elementBuilder.newVertex(vid)) } - if (queryParam.fetchProp) matchedVertices.flatMap(vs => getVertices(vs)) - else matchedVertices + if (queryParam.fetchProp) matchedVerticesFuture.flatMap(vs => getVertices(queryParam.copy(vertexIds = vs.map(_.id)))) + else matchedVerticesFuture } - override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = { - val verticesWithIdx = vertices.zipWithIndex - val futures = verticesWithIdx.groupBy { case (v, idx) => v.serviceColumn }.map { case (serviceColumn, vertexGroup) => - getVertexFetcher(serviceColumn).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2))) + override def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = { + val vertexIdsWithIdx = queryParam.vertexIds.zipWithIndex + val futures = vertexIdsWithIdx.groupBy { case (vId, idx) => vId.column }.map { case (serviceColumn, vertexGroup) => + val (vertexIds, indices) = vertexGroup.unzip + getVertexFetcher(serviceColumn).fetchVertices(queryParam.copy(vertexIds = vertexIds)).map(_.zip(indices)) } Future.sequence(futures).map { ls => @@ -562,8 +563,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } override def getVertex(vertexId: VertexId): Option[S2VertexLike] = { - val v = elementBuilder.newVertex(vertexId) - Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout) + val queryParam = VertexQueryParam(vertexIds = Seq(vertexId)) + Await.result(getVertices(queryParam).map { vertices => vertices.headOption }, WaitTimeout) } override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index fb229ec..cab866d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -110,10 +110,7 @@ trait S2GraphLike extends Graph { def shutdown(modelDataDelete: Boolean = false): Unit - def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] - - def getVerticesJava(vertices: util.List[S2VertexLike]): CompletableFuture[util.List[S2VertexLike]] = - getVertices(vertices.toSeq).map(_.asJava).toJava.toCompletableFuture + def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] @@ -217,22 +214,23 @@ trait S2GraphLike extends Graph { Await.result(future, WaitTimeout).flatten.iterator } else { - val vertices = ids.collect { - case s2Vertex: S2VertexLike => s2Vertex - case vId: VertexId => elementBuilder.newVertex(vId) - case vertex: Vertex => elementBuilder.newVertex(vertex.id().asInstanceOf[VertexId]) - case other@_ => elementBuilder.newVertex(VertexId.fromString(other.toString)) + val vertexIds = ids.collect { + case s2Vertex: S2VertexLike => s2Vertex.id + case vId: VertexId => vId + case vertex: Vertex => vertex.id().asInstanceOf[VertexId] + case other@_ => VertexId.fromString(other.toString) } if (fetchVertices) { - val future = getVertices(vertices).map { vs => + val queryParam = VertexQueryParam(vertexIds = vertexIds) + val future = getVertices(queryParam).map { vs => val ls = new util.ArrayList[structure.Vertex]() ls.addAll(vs) ls.iterator() } Await.result(future, WaitTimeout) } else { - vertices.iterator + vertexIds.map(vId => elementBuilder.newVertex(vId)).iterator } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala index 4addcab..d6c2e64 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala @@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future} trait VertexFetcher extends AutoCloseable { def init(config: Config)(implicit ec: ExecutionContext): Unit = {} - def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] + def fetchVertices(vertexQueryParam: VertexQueryParam)(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala index c768d81..dfdeca4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -212,18 +212,19 @@ class RestHandler(graph: S2GraphLike)(implicit ec: ExecutionContext) { private def getVertices(jsValue: JsValue) = { val jsonQuery = jsValue - val vertices = jsonQuery.as[List[JsValue]].flatMap { js => + val vertexIds = jsonQuery.as[List[JsValue]].flatMap { js => val serviceName = (js \ "serviceName").as[String] val columnName = (js \ "columnName").as[String] for { idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue]) id <- jsValueToAny(idJson) } yield { - graph.toVertex(serviceName, columnName, id) + graph.elementBuilder.newVertexId(serviceName)(columnName)(id) } } + val queryParam = VertexQueryParam(vertexIds = vertexIds) - graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } + graph.getVertices(queryParam) map { vertices => PostProcess.verticesToJson(vertices) } } private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 89303e6..f670e9c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -202,7 +202,7 @@ object AsynchbaseStorage { } } - def buildRequest(serDe: StorageSerDe, queryRequest: QueryRequest, vertex: S2VertexLike) = { + def buildRequest(serDe: StorageSerDe, vertex: S2VertexLike) = { val kvs = serDe.vertexSerializer(vertex).toKeyValues val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) // get.setTimeout(this.singleGetTimeout.toShort) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala index f16c8e9..4815bf3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.core.storage.hbase import com.typesafe.config.Config import org.apache.s2graph.core._ +import org.apache.s2graph.core.parsers.Where import org.apache.s2graph.core.schema.ServiceColumn import org.apache.s2graph.core.storage.serde.Serializable import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} @@ -40,23 +41,23 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike, import scala.collection.JavaConverters._ - private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { - val rpc = buildRequest(serDe, queryRequest, vertex) + private def fetchKeyValues(vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + val rpc = buildRequest(serDe, vertex) AsynchbaseStorage.fetchKeyValues(client, rpc) } - override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { - def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { + override def fetchVertices(vertexQueryParam: VertexQueryParam)(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { + def fromResult(kvs: Seq[SKeyValue], + version: String): Seq[S2VertexLike] = { if (kvs.isEmpty) Nil - else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq + else { + serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq.filter(vertexQueryParam.where.get.filter) + } } + val vertices = vertexQueryParam.vertexIds.map(vId => graph.elementBuilder.newVertex(vId)) val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - - fetchKeyValues(queryRequest, vertex).map { kvs => + fetchKeyValues(vertex).map { kvs => fromResult(kvs, vertex.serviceColumn.schemaVersion) } recoverWith { case ex: Throwable => Future.successful(Nil) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala index 2d3880c..94a0da6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala @@ -43,11 +43,12 @@ class RocksVertexFetcher(val graph: S2GraphLike, RocksStorage.fetchKeyValues(vdb, db, rpc) } - override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { + override def fetchVertices(vertexQueryParam: VertexQueryParam)(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { if (kvs.isEmpty) Nil - else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq + else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq.filter(vertexQueryParam.where.get.filter) } + val vertices = vertexQueryParam.vertexIds.map(vId => graph.elementBuilder.newVertex(vId)) val futures = vertices.map { vertex => val queryParam = QueryParam.Empty http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f47622fd/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala index 478517f..6e1a7cc 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala @@ -87,7 +87,7 @@ object FieldResolver { val selectedFields = AstHelper.selectedFields(c.astFields) val canSkipFetch = selectedFields.forall(f => f == "id" || !columnFields(f)) - val vertexQueryParam = VertexQueryParam(offset, limit, searchOpt, vertices.map(_.id), !canSkipFetch) + val vertexQueryParam = VertexQueryParam(vertices.map(_.id), offset, limit, searchOpt, !canSkipFetch) vertexQueryParam } @@ -102,7 +102,7 @@ object FieldResolver { val columnFields = column.metasInvMap.keySet val canSkipFetch = selectedFields.forall(f => f == "id" || !columnFields(f)) - val vertexQueryParam = VertexQueryParam(0, 1, None, Seq(vertex.id), !canSkipFetch) + val vertexQueryParam = VertexQueryParam(Seq(vertex.id), 0, 1, None, !canSkipFetch) vertexQueryParam }
