add search vertices
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ba938bc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ba938bc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ba938bc0 Branch: refs/heads/master Commit: ba938bc087898ba6221bbb53ea62e0ec12185fe3 Parents: 3ac20b4 Author: daewon <[email protected]> Authored: Thu Apr 19 18:13:34 2018 +0900 Committer: daewon <[email protected]> Committed: Thu Apr 19 18:13:34 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/QueryParam.scala | 91 +++++++++------- .../scala/org/apache/s2graph/core/S2Graph.scala | 33 ++++-- .../s2graph/core/index/ESIndexProvider.scala | 67 ++++++------ .../s2graph/core/index/IndexProvider.scala | 1 + .../core/index/LuceneIndexProvider.scala | 107 +++++++++++-------- .../s2graph/core/index/IndexProviderTest.scala | 23 ++-- .../graphql/repository/GraphRepository.scala | 20 ++-- .../s2graph/graphql/types/FieldResolver.scala | 2 + .../apache/s2graph/graphql/types/S2Type.scala | 27 ++++- 9 files changed, 229 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 748e8c5..faf04db 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -37,9 +37,11 @@ import scala.util.{Success, Try} object Query { val initialScore = 1.0 lazy val empty = Query() + def apply(query: Query): Query = { Query(query.vertices, query.steps, query.queryOption, query.jsonQuery) } + def toQuery(srcVertices: Seq[S2VertexLike], queryParams: Seq[QueryParam]) = Query(srcVertices, Vector(Step(queryParams))) } @@ -49,9 +51,10 @@ case class MinShouldMatchParam(prop: String, count: Int, terms: Set[Any]) object GroupBy { val Empty = GroupBy() } + case class GroupBy(keys: Seq[String] = Nil, limit: Int = Int.MaxValue, - minShouldMatch: Option[MinShouldMatchParam]= None) + minShouldMatch: Option[MinShouldMatchParam] = None) case class MultiQuery(queries: Seq[Query], weights: Seq[Double], @@ -79,12 +82,12 @@ case class QueryOption(removeCycle: Boolean = false, ignorePrevStepCache: Boolean = false) { val orderByKeys = orderByColumns.map(_._1) val ascendingVals = orderByColumns.map(_._2) - val selectColumnsMap = selectColumns.map { c => c -> true } .toMap + val selectColumnsMap = selectColumns.map { c => c -> true }.toMap val scoreFieldIdx = orderByKeys.zipWithIndex.find(t => t._1 == "score").map(_._2).getOrElse(-1) val (edgeSelectColumns, propsSelectColumns) = selectColumns.partition(c => LabelMeta.defaultRequiredMetaNames.contains(c)) /** */ val edgeSelectColumnsFiltered = edgeSelectColumns -// val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c)) + // val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c)) lazy val cacheKeyBytes: Array[Byte] = { val selectBytes = Bytes.toBytes(selectColumns.toString) val groupBytes = Bytes.toBytes(groupBy.keys.toString) @@ -118,9 +121,10 @@ object EdgeTransformer { } /** - * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold. - * @param jsValue - */ + * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold. + * + * @param jsValue + */ case class EdgeTransformer(jsValue: JsValue) { val Delimiter = "\\$" val targets = jsValue.asOpt[List[Vector[String]]].toList @@ -204,13 +208,15 @@ case class Step(queryParams: Seq[QueryParam], cacheTTL: Long = -1, groupBy: GroupBy = GroupBy.Empty) { -// lazy val excludes = queryParams.filter(_.exclude) -// lazy val includes = queryParams.filterNot(_.exclude) -// lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap + // lazy val excludes = queryParams.filter(_.exclude) + // lazy val includes = queryParams.filterNot(_.exclude) + // lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap lazy val cacheKeyBytes = queryParams.map(_.toCacheKeyRaw(Array.empty[Byte])).foldLeft(Array.empty[Byte])(Bytes.add) + def toCacheKey(lss: Seq[Long]): Long = Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong() -// MurmurHash3.bytesHash(toCacheKeyRaw(lss)) + + // MurmurHash3.bytesHash(toCacheKeyRaw(lss)) def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = { var bytes = Array.empty[Byte] @@ -277,6 +283,7 @@ object QueryParam { val Delimiter = "," val maxMetaByte = (-1).toByte val fillArray = Array.fill(100)(maxMetaByte) + import scala.collection.JavaConverters._ def apply(labelWithDirection: LabelWithDirection): QueryParam = { @@ -285,34 +292,43 @@ object QueryParam { QueryParam(labelName = label.label, direction = direction) } } + +case class VertexQueryParam(offset: Int, + limit: Int, + searchString: Option[String], + vertexIds: Seq[VertexId] = Nil, + fetchProp: Boolean = true) { +} + case class QueryParam(labelName: String, - direction: String = "out", - offset: Int = 0, - limit: Int = S2Graph.DefaultFetchLimit, - sample: Int = -1, - maxAttempt: Int = 20, - rpcTimeout: Int = 600000, - cacheTTLInMillis: Long = -1L, - indexName: String = LabelIndex.DefaultName, - where: Try[Where] = Success(WhereParser.success), - timestamp: Long = System.currentTimeMillis(), - threshold: Double = Double.MinValue, - rank: RankParam = RankParam.Default, - intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None, - durationOpt: Option[(Long, Long)] = None, - exclude: Boolean = false, - include: Boolean = false, - has: Map[String, Any] = Map.empty, - duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First, - includeDegree: Boolean = false, - scorePropagateShrinkage: Long = 500L, - scorePropagateOp: String = "multiply", - shouldNormalize: Boolean = false, - whereRawOpt: Option[String] = None, - cursorOpt: Option[String] = None, - tgtVertexIdOpt: Option[Any] = None, - edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson), - timeDecay: Option[TimeDecay] = None) { + direction: String = "out", + offset: Int = 0, + limit: Int = S2Graph.DefaultFetchLimit, + sample: Int = -1, + maxAttempt: Int = 20, + rpcTimeout: Int = 600000, + cacheTTLInMillis: Long = -1L, + indexName: String = LabelIndex.DefaultName, + where: Try[Where] = Success(WhereParser.success), + timestamp: Long = System.currentTimeMillis(), + threshold: Double = Double.MinValue, + rank: RankParam = RankParam.Default, + intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None, + durationOpt: Option[(Long, Long)] = None, + exclude: Boolean = false, + include: Boolean = false, + has: Map[String, Any] = Map.empty, + duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First, + includeDegree: Boolean = false, + scorePropagateShrinkage: Long = 500L, + scorePropagateOp: String = "multiply", + shouldNormalize: Boolean = false, + whereRawOpt: Option[String] = None, + cursorOpt: Option[String] = None, + tgtVertexIdOpt: Option[Any] = None, + edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson), + timeDecay: Option[TimeDecay] = None) { + import JSONParser._ //TODO: implement this. @@ -447,6 +463,7 @@ object DuplicatePolicy extends Enumeration { } } } + case class TimeDecay(initial: Double = 1.0, lambda: Double = 0.1, timeUnit: Double = 60 * 60 * 24, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 09f9c7c..2dc9f63 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -33,7 +33,8 @@ import org.apache.s2graph.core.storage.rocks.RocksStorage import org.apache.s2graph.core.storage.{MutateResponse, Storage} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies +import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies} +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph} import scala.collection.JavaConversions._ @@ -58,7 +59,7 @@ object S2Graph { "hbase.table.name" -> "s2graph", "hbase.table.compression.algorithm" -> "gz", "phase" -> "dev", - "db.default.driver" -> "org.h2.Driver", + "db.default.driver" -> "org.h2.Driver", "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL", "db.default.password" -> "graph", "db.default.user" -> "graph", @@ -110,7 +111,7 @@ object S2Graph { } { m.put(key, value) } - val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig) + val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig) config } @@ -134,7 +135,7 @@ object S2Graph { storageBackend match { case "hbase" => - hbaseExecutor = + hbaseExecutor = if (config.getString("hbase.zookeeper.quorum") == "localhost") AsynchbaseStorage.initLocalHBase(config) else @@ -176,7 +177,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override val config = _config.withFallback(S2Graph.DefaultConfig) - val storageBackend = Try { config.getString("s2graph.storage.backend") }.getOrElse("hbase") + val storageBackend = Try { + config.getString("s2graph.storage.backend") + }.getOrElse("hbase") Model.apply(config) Model.loadCache() @@ -260,6 +263,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap localLongId.set(0l) } + def searchVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = { + val matchedVertices = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids => + (queryParam.vertexIds ++ vids).distinct.map(vid => elementBuilder.newVertex(vid)) + } + + if (queryParam.fetchProp) matchedVertices.flatMap(getVertices) + else matchedVertices + } + override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = { val verticesWithIdx = vertices.zipWithIndex val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => @@ -289,7 +301,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { - val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) } + val futures = vertices.map { vertex => + storage.mutateVertex(zkQuorum, vertex, withWait) + } Future.sequence(futures) } @@ -297,7 +311,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) } - Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } + + Future.sequence(futures).flatMap { ls => + indexProvider.mutateVerticesAsync(vertices).map { _ => + ls.flatten.toSeq.sortBy(_._2).map(_._1) + } + } } override def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala index 10c9222..fbf76ef 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala @@ -25,12 +25,13 @@ import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType} import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.HttpClient import com.typesafe.config.Config +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait.Predicate import org.apache.s2graph.core.io.Conversions import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.types.VertexId -import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam} import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer -import play.api.libs.json.Json +import play.api.libs.json.{Json, Reads} import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration @@ -38,6 +39,7 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Try class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider { + import GlobalIndex._ import IndexProvider._ @@ -103,10 +105,10 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { val bulkRequests = vertices.flatMap { vertex => - toFields(vertex, forceToIndex).toSeq.map { fields => - update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields) - } + toFields(vertex, forceToIndex).toSeq.map { fields => + update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields) } + } if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true)) else { @@ -149,61 +151,56 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind } } - override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] = - Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime) - - override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = { - val field = eidField - val ids = new java.util.HashSet[EdgeId] - - val queryString = buildQueryString(hasContainers) + private def fetchInner[T](queryString: String, indexKey: String, field: String, reads: Reads[T])(validate: (T => Boolean)): Future[util.List[T]] = { + val ids = new java.util.HashSet[T] client.execute { - search(GlobalIndex.EdgeIndexName).query(queryString) + search(indexKey).query(queryString) }.map { ret => ret match { case Left(failure) => case Right(results) => results.result.hits.hits.foreach { searchHit => searchHit.sourceAsMap.get(field).foreach { idValue => - val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get - + val id = reads.reads(Json.parse(idValue.toString)).get //TODO: Come up with better way to filter out hits with invalid meta. - EdgeId.isValid(id).foreach(ids.add) + if (validate(id)) ids.add(id) } } } - new util.ArrayList[EdgeId](ids) + new util.ArrayList(ids) } } + override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] = + Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime) + + override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = { + val field = eidField + + val queryString = buildQueryString(hasContainers) + fetchInner[EdgeId](queryString, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)(e => EdgeId.isValid(e).isDefined) + } override def fetchVertexIds(hasContainers: util.List[HasContainer]): util.List[VertexId] = Await.result(fetchVertexIdsAsync(hasContainers), WaitTime) override def fetchVertexIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[VertexId]] = { val field = vidField - val ids = new java.util.HashSet[VertexId] - val queryString = buildQueryString(hasContainers) - client.execute { - search(GlobalIndex.VertexIndexName).query(queryString) - }.map { ret => - ret match { - case Left(failure) => - case Right(results) => - results.result.hits.hits.foreach { searchHit => - searchHit.sourceAsMap.get(field).foreach { idValue => - val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get - //TODO: Come up with better way to filter out hits with invalid meta. - VertexId.isValid(id).foreach(ids.add) - } - } - } + fetchInner[VertexId](queryString, GlobalIndex.VertexIndexName, field, Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined) + } + + override def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[util.List[VertexId]] = { + val field = vidField + val empty = new util.ArrayList[VertexId]() - new util.ArrayList[VertexId](ids) + vertexQueryParam.searchString match { + case Some(queryString) => + fetchInner[VertexId](queryString, GlobalIndex.VertexIndexName, field, Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined) + case None => Future.successful(empty) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index ae632df..ffbebf4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -121,6 +121,7 @@ trait IndexProvider { def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]] + def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[java.util.List[VertexId]] = Future.successful(util.Arrays.asList()) def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala index 841331d..68d481c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.core.index +import java.io.File import java.util import com.typesafe.config.Config @@ -26,20 +27,21 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.document.{Document, Field, StringField} import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term} import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} -import org.apache.lucene.search.IndexSearcher -import org.apache.lucene.store.{BaseDirectory, RAMDirectory} +import org.apache.lucene.search.{IndexSearcher, Query} +import org.apache.lucene.store.{BaseDirectory, RAMDirectory, SimpleFSDirectory} import org.apache.s2graph.core.io.Conversions import org.apache.s2graph.core.mysqls.GlobalIndex import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam} import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer -import play.api.libs.json.Json +import play.api.libs.json.{Json, Reads} import scala.concurrent.Future class LuceneIndexProvider(config: Config) extends IndexProvider { + import GlobalIndex._ import IndexProvider._ @@ -49,10 +51,19 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { val analyzer = new StandardAnalyzer() val writers = mutable.Map.empty[String, IndexWriter] val directories = mutable.Map.empty[String, BaseDirectory] + val baseDirectory = scala.util.Try(config.getString("index.provider.base.dir")).getOrElse(".") + + private def getOrElseDirectory(indexName: String): BaseDirectory = { + val pathname = s"${baseDirectory}/${indexName}" + val dir = directories.getOrElseUpdate(indexName, new SimpleFSDirectory(new File(pathname).toPath)) + + dir + } private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = { writers.getOrElseUpdate(indexName, { - val dir = directories.getOrElseUpdate(indexName, new RAMDirectory()) + val dir = getOrElseDirectory(indexName) + val indexConfig = new IndexWriterConfig(analyzer) new IndexWriter(dir, indexConfig) }) @@ -124,8 +135,9 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { vertices.foreach { vertex => toDocument(vertex, forceToIndex).foreach { doc => val vId = vertex.id.toString() +// logger.error(s"DOC: ${doc}") - writer.updateDocument(new Term(IdField, vId), doc) + writer.updateDocument(new Term(vidField, vId), doc) } } @@ -144,7 +156,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { toDocument(edge, forceToIndex).foreach { doc => val eId = edge.edgeId.toString - writer.updateDocument(new Term(IdField, eId), doc) + writer.updateDocument(new Term(eidField, eId), doc) } } @@ -153,73 +165,84 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { edges.map(_ => true) } - override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { - val field = eidField - val ids = new java.util.HashSet[EdgeId] - - val queryString = buildQueryString(hasContainers) + private def fetchInner[T](q: Query, indexKey: String, field: String, reads: Reads[T]): util.List[T] = { + val ids = new java.util.HashSet[T] + var reader: DirectoryReader = null try { - val q = new QueryParser(field, analyzer).parse(queryString) - - val reader = DirectoryReader.open(directories(GlobalIndex.EdgeIndexName)) + val reader = DirectoryReader.open(getOrElseDirectory(indexKey)) val searcher = new IndexSearcher(reader) val docs = searcher.search(q, hitsPerPage) + logger.error(s"total hit: ${docs.scoreDocs.length}") docs.scoreDocs.foreach { scoreDoc => val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get - ids.add(id); - } + logger.error(s"DOC_IN_L: ${document.toString}") - reader.close() - ids + val id = reads.reads(Json.parse(document.get(field))).get + ids.add(id) + } } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids + case e: org.apache.lucene.index.IndexNotFoundException => logger.info("Index file not found.") + } finally { + if (reader != null) reader.close() } - new util.ArrayList[EdgeId](ids) + new util.ArrayList[T](ids) } override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { val field = vidField - val ids = new java.util.HashSet[VertexId] val queryString = buildQueryString(hasContainers) try { val q = new QueryParser(field, analyzer).parse(queryString) + fetchInner[VertexId](q, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads) + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + util.Arrays.asList[VertexId]() + } + } - val reader = DirectoryReader.open(directories(GlobalIndex.VertexIndexName)) - val searcher = new IndexSearcher(reader) - - val docs = searcher.search(q, hitsPerPage) - - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get - ids.add(id) - } + override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { + val field = eidField + val queryString = buildQueryString(hasContainers) - reader.close() - ids + try { + val q = new QueryParser(field, analyzer).parse(queryString) + fetchInner[EdgeId](q, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads) } catch { case ex: ParseException => logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids + util.Arrays.asList[EdgeId]() } + } + + override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) - new util.ArrayList[VertexId](ids) + override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) + + override def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[util.List[VertexId]] = { + val ret = vertexQueryParam.searchString.fold(util.Arrays.asList[VertexId]()) { queryString => + val field = vidField + try { + val q = new QueryParser(field, analyzer).parse(queryString) + fetchInner[VertexId](q, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads) + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + util.Arrays.asList[VertexId]() + } + } + + Future.successful(ret) } override def shutdown(): Unit = { writers.foreach { case (_, writer) => writer.close() } } - override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) - - override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala index 4889412..a5349dc 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala @@ -29,6 +29,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper import org.apache.tinkerpop.gremlin.structure.T import scala.collection.JavaConversions._ +import scala.concurrent._ +import scala.concurrent.duration._ class IndexProviderTest extends IntegrateCommon { import scala.concurrent.ExecutionContext.Implicits.global @@ -74,14 +76,19 @@ class IndexProviderTest extends IntegrateCommon { Thread.sleep(1000) (0 until numOfTry).foreach { ith => - val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1))) - - var ids = indexProvider.fetchVertexIds(Seq(hasContainer)) - ids.head shouldBe vertex.id - - ids.foreach { id => - println(s"[Id]: $id") - } +// val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1))) + val hasContainer = new HasContainer(GlobalIndex.serviceColumnField, P.eq(testColumn.columnName)) + + val f = graph.searchVertices(VertexQueryParam(0, 100, Option(s"${GlobalIndex.serviceColumnField}:${testColumn.columnName}"))) + val a = Await.result(f, Duration("60 sec")) + println(a) + +// var ids = indexProvider.fetchVertexIds(Seq(hasContainer)) +// ids.head shouldBe vertex.id +// +// ids.foreach { id => +// println(s"[Id]: $id") +// } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala index c7ffae1..1f53c76 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala @@ -30,13 +30,14 @@ import org.slf4j.{Logger, LoggerFactory} import sangria.execution.deferred._ import sangria.schema._ +import scala.collection.immutable import scala.concurrent._ import scala.util.{Failure, Success, Try} object GraphRepository { - implicit val vertexHasId = new HasId[S2VertexLike, S2VertexLike] { - override def id(value: S2VertexLike): S2VertexLike = value + implicit val vertexHasId = new HasId[(VertexQueryParam, Seq[S2VertexLike]), VertexQueryParam] { + override def id(value: (VertexQueryParam, Seq[S2VertexLike])): VertexQueryParam = value._1 } implicit val edgeHasId = new HasId[(S2VertexLike, QueryParam, Seq[S2EdgeLike]), DeferFetchEdges] { @@ -44,9 +45,12 @@ object GraphRepository { DeferFetchEdges(value._1, value._2) } - val vertexFetcher = Fetcher((ctx: GraphRepository, ids: Seq[S2VertexLike]) => { - ctx.getVertices(ids) - }) + val vertexFetcher = + Fetcher((ctx: GraphRepository, ids: Seq[VertexQueryParam]) => { + implicit val ec = ctx.ec + + Future.traverse(ids)(ctx.getVertices).map(vs => ids.zip(vs)) + }) val edgeFetcher = Fetcher((ctx: GraphRepository, ids: Seq[DeferFetchEdges]) => { implicit val ec = ctx.ec @@ -58,7 +62,7 @@ object GraphRepository { } val f: Future[Iterable[(QueryParam, Seq[S2EdgeLike])]] = Future.sequence(edgesByParam) - val grouped = f.map { tpLs => + val grouped: Future[Seq[(S2VertexLike, QueryParam, Seq[S2EdgeLike])]] = f.map { tpLs => tpLs.toSeq.flatMap { case (qp, edges) => edges.groupBy(_.srcForVertex).map { case (v, edges) => (v, qp, edges) } } @@ -121,8 +125,8 @@ class GraphRepository(val graph: S2GraphLike) { graph.mutateEdges(edges, withWait = true) } - def getVertices(vertex: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = { - graph.getVertices(vertex) + def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = { + graph.asInstanceOf[S2Graph].searchVertices(queryParam) } def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala index 4f092dd..64650d3 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala @@ -53,6 +53,8 @@ object FieldResolver { val ids = c.argOpt[Any]("id").toSeq ++ c.argOpt[List[Any]]("ids").toList.flatten val vertices = ids.map(vid => c.ctx.toS2VertexLike(vid, column)) + val search = c.argOpt[String]("search") + val columnFields = column.metasInvMap.keySet val selectedFields = AstHelper.selectedFields(c.astFields) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala index 9066d21..4ba6680 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala @@ -138,15 +138,31 @@ object S2Type { ListType(ColumnType), arguments = List( Argument("id", OptionInputType(toScalarType(column.columnType))), - Argument("ids", OptionInputType(ListInputType(toScalarType(column.columnType)))) + Argument("ids", OptionInputType(ListInputType(toScalarType(column.columnType)))), + Argument("search", OptionInputType(StringType)) ), description = Option("desc here"), resolve = c => { implicit val ec = c.ctx.ec val (vertices, canSkipFetchVertex) = FieldResolver.serviceColumnOnService(column, c) + val searchOpt = c.argOpt[String]("search").map { qs => + val prefix = s"${GlobalIndex.serviceField}:${service.serviceName} AND ${GlobalIndex.serviceColumnField}:${column.columnName}" + if (qs.trim.nonEmpty) Seq(prefix, qs).mkString(" AND ") + else prefix + qs + } + + println(searchOpt) + + val vertexQueryParam = VertexQueryParam(0, 100, searchOpt, vertices.map(_.id)) + +// if (canSkipFetchVertex) Future.successful(vertices) +// else GraphRepository.vertexFetcher.deferSeq(deferVertices) + +// val empty = Seq.empty[S2VertexLike] +// DeferredValue(GraphRepository.vertexFetcher.defer(vertexQueryParam)).map(m => m._2) - if (canSkipFetchVertex) Future.successful(vertices) - else GraphRepository.vertexFetcher.deferSeq(vertices) + c.ctx.getVertices(vertexQueryParam) } ): Field[GraphRepository, Any] } @@ -173,8 +189,9 @@ object S2Type { implicit val ec = c.ctx.ec val (vertex, canSkipFetchVertex) = FieldResolver.serviceColumnOnLabel(c) - if (canSkipFetchVertex) Future.successful(vertex) - else GraphRepository.vertexFetcher.defer(vertex) + // if (canSkipFetchVertex) Future.successful(vertex) + // else GraphRepository.vertexFetcher.defer(vertex) + Future.successful(vertex) }) lazy val EdgeType = ObjectType(
