Repository: incubator-s2graph Updated Branches: refs/heads/master 3cc98da8a -> 8f9214e82
[S2GRAPH-175]: Provide Elastic Search Index Provider. - add ESIndexProvider. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/939e5f3b Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/939e5f3b Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/939e5f3b Branch: refs/heads/master Commit: 939e5f3b5b92d11c47e8b60e824464378530cd2a Parents: 3cc98da Author: DO YUNG YOON <[email protected]> Authored: Tue Jan 30 11:58:05 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Jan 30 11:58:24 2018 +0900 ---------------------------------------------------------------------- project/Common.scala | 2 + s2core/build.sbt | 5 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 21 +- .../org/apache/s2graph/core/S2GraphLike.scala | 11 +- .../s2graph/core/index/ESIndexProvider.scala | 206 +++++++++++++++++ .../s2graph/core/index/IndexProvider.scala | 206 +---------------- .../core/index/LuceneIndexProvider.scala | 208 +++++++++++++++++ .../s2graph/core/mysqls/GlobalIndex.scala | 2 + .../apache/s2graph/core/types/VertexId.scala | 12 +- .../core/Integrate/IntegrateCommon.scala | 8 +- .../s2graph/core/index/IndexProviderTest.scala | 226 ++++++++++--------- .../core/tinkerpop/S2GraphProvider.scala | 1 + 12 files changed, 577 insertions(+), 331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index b965aed..d3b8d93 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -28,6 +28,8 @@ object Common { val hadoopVersion = "2.7.3" val tinkerpopVersion = "3.2.5" + val elastic4sVersion = "6.1.0" + /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */ val loggingRuntime = Seq( "log4j" % "log4j" % "1.2.17", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 804c57c..110d5e5 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -50,7 +50,10 @@ libraryDependencies ++= Seq( "org.apache.lucene" % "lucene-core" % "6.6.0", "org.apache.lucene" % "lucene-queryparser" % "6.6.0", "org.rocksdb" % "rocksdbjni" % "5.8.0", - "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0" + "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0", + "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion excludeLogging(), + "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(), + "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging() ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 3eb1fa7..f52acf3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -377,20 +377,19 @@ case class S2Edge(override val innerGraph: S2GraphLike, object EdgeId { val EdgeIdDelimiter = "," def fromString(s: String): EdgeId = { -// val Array(src, tgt, labelName, dir, ts) = s.split(EdgeIdDelimiter) -// val label = Label.findByName(labelName).getOrElse(throw LabelNotExistException(labelName)) -// val srcColumn = label.srcColumnWithDir(GraphUtil.toDirection(dir)) -// val tgtColumn = label.tgtColumnWithDir(GraphUtil.toDirection(dir)) -// EdgeId( -// JSONParser.toInnerVal(src, srcColumn.columnType, label.schemaVersion), -// JSONParser.toInnerVal(tgt, tgtColumn.columnType, label.schemaVersion), -// labelName, -// dir, -// ts.toLong -// ) val js = Json.parse(s) s2EdgeIdReads.reads(Json.parse(s)).get } + + def isValid(edgeId: EdgeId): Option[EdgeId] = { + VertexId.isValid(edgeId.srcVertexId).flatMap { _ => + VertexId.isValid(edgeId.tgtVertexId).flatMap { _ => + Label.findByName(edgeId.labelName).map { _ => + edgeId + } + } + } + } } case class EdgeId(srcVertexId: VertexId, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index bb36a33..f639e84 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -298,8 +298,15 @@ trait S2GraphLike extends Graph { belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { val vertex = elementBuilder.newVertex(id, ts, props, op, belongLabelIds) - val future = mutateVertices(Seq(vertex), withWait = true).map { rets => - if (rets.forall(_.isSuccess)) vertex + val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets => + if (rets.forall(_.isSuccess)) { + indexProvider.mutateVerticesAsync(Seq(vertex)).map { ls => + if (ls.forall(identity)) vertex + else { + throw new RuntimeException("indexVertex failed.") + } + } + } else throw new RuntimeException("addVertex failed.") } Await.ready(future, WaitTimeout) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala new file mode 100644 index 0000000..6486fa9 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala @@ -0,0 +1,206 @@ +package org.apache.s2graph.core.index + +import java.util + +import com.sksamuel.elastic4s.ElasticsearchClientUri +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.HttpClient +import com.typesafe.config.Config +import org.apache.s2graph.core.io.Conversions +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer +import play.api.libs.json.Json + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider { + import GlobalIndex._ + import IndexProvider._ + + import scala.collection.mutable + + implicit val executor = ec + + val esClientUri = "localhost" +// // config.getString("es.index.provider.client.uri") + val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200)) +// val node = LocalNode("elasticsearch", "./es") +// val client = node.http(true) + + val WaitTime = Duration("60 seconds") + + private def toFields(globalIndex: GlobalIndex, vertex: S2VertexLike): Option[Map[String, Any]] = { + val props = vertex.props.asScala + val filtered = props.filterKeys(globalIndex.propNamesSet) + + if (filtered.isEmpty) None + else { + val fields = mutable.Map.empty[String, Any] + + fields += (vidField -> vertex.id.toString()) + fields += (serviceField -> vertex.serviceName) + fields += (serviceColumnField -> vertex.columnName) + + filtered.foreach { case (dim, s2VertexProperty) => + s2VertexProperty.columnMeta.dataType match { + case "string" => fields += (dim -> s2VertexProperty.innerVal.value.toString) + case _ => fields += (dim -> s2VertexProperty.innerVal.value) + } + } + + Option(fields.toMap) + } + } + + private def toFields(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Map[String, Any]] = { + val props = edge.getPropsWithTs().asScala + val filtered = props.filterKeys(globalIndex.propNamesSet) + + if (filtered.isEmpty) None + else { + val fields = mutable.Map.empty[String, Any] + + fields += (eidField -> edge.edgeId.toString) + fields += (serviceField -> edge.serviceName) + fields += (labelField -> edge.label()) + + filtered.foreach { case (dim, s2Property) => + s2Property.labelMeta.dataType match { + case "string" => fields += (dim -> s2Property.innerVal.value.toString) + case _ => fields += (dim -> s2Property.innerVal.value) + } + } + + Option(fields.toMap) + } + } + + override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = { + val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType) + + val bulkRequests = globalIndexOptions.flatMap { globalIndex => + vertices.flatMap { vertex => + toFields(globalIndex, vertex).toSeq.map { fields => + indexInto(globalIndex.backendIndexNameWithType).fields(fields) + } + } + } + if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true)) + else { + client.execute { + val requests = bulk(requests = bulkRequests) + + requests + }.map { ret => + ret match { + case Left(failure) => vertices.map(_ => false) + case Right(results) => vertices.map(_ => true) + } + } + } + } + + override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = + Await.result(mutateVerticesAsync(vertices), WaitTime) + + override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = + Await.result(mutateEdgesAsync(edges), WaitTime) + + override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = { + val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) + + val bulkRequests = globalIndexOptions.flatMap { globalIndex => + edges.flatMap { edge => + toFields(globalIndex, edge).toSeq.map { fields => + indexInto(globalIndex.backendIndexNameWithType).fields(fields) + } + } + } + + if (bulkRequests.isEmpty) Future.successful(edges.map(_ => true)) + else { + client.execute { + bulk(bulkRequests) + }.map { ret => + ret match { + case Left(failure) => edges.map(_ => false) + case Right(results) => edges.map(_ => true) + } + } + } + } + + override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] = + Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime) + + override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = { + val field = eidField + val ids = new java.util.HashSet[EdgeId] + + GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers) match { + case None => Future.successful(new util.ArrayList[EdgeId](ids)) + case Some(globalIndex) => + val queryString = buildQueryString(hasContainers) + + client.execute { + search(globalIndex.backendIndexName).query(queryString) + }.map { ret => + ret match { + case Left(failure) => + case Right(results) => + results.result.hits.hits.foreach { searchHit => + searchHit.sourceAsMap.get(field).foreach { idValue => + val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get + + //TODO: Come up with better way to filter out hits with invalid meta. + EdgeId.isValid(id).foreach(ids.add) + } + } + } + + new util.ArrayList[EdgeId](ids) + } + } + } + + + override def fetchVertexIds(hasContainers: util.List[HasContainer]): util.List[VertexId] = + Await.result(fetchVertexIdsAsync(hasContainers), WaitTime) + + override def fetchVertexIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[VertexId]] = { + val field = vidField + val ids = new java.util.HashSet[VertexId] + + GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers) match { + case None => Future.successful(new util.ArrayList[VertexId](ids)) + case Some(globalIndex) => + val queryString = buildQueryString(hasContainers) + + client.execute { + search(globalIndex.backendIndexName).query(queryString) + }.map { ret => + ret match { + case Left(failure) => + case Right(results) => + results.result.hits.hits.foreach { searchHit => + searchHit.sourceAsMap.get(field).foreach { idValue => + val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get + //TODO: Come up with better way to filter out hits with invalid meta. + VertexId.isValid(id).foreach(ids.add) + } + } + } + + new util.ArrayList[VertexId](ids) + } + } + } + + override def shutdown(): Unit = { + client.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index 2411e65..dcddadf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -22,35 +22,28 @@ package org.apache.s2graph.core.index import java.util import com.typesafe.config.Config -import org.apache.lucene.analysis.standard.StandardAnalyzer -import org.apache.lucene.document._ -import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} -import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} -import org.apache.lucene.search.IndexSearcher -import org.apache.lucene.store.{BaseDirectory, RAMDirectory} -import org.apache.s2graph.core.io.Conversions import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls._ -import org.apache.s2graph.core.types.{InnerValLike, VertexId} -import org.apache.s2graph.core.utils.logger -import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains, P} +import org.apache.s2graph.core.types.VertexId import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP} +import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains} import org.apache.tinkerpop.gremlin.structure.T -import play.api.libs.json.Json -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try object IndexProvider { import GlobalIndex._ val hitsPerPage = 100000 - def apply(config: Config): IndexProvider = { - val indexProviderType = "lucene" -// if (config.hasPath("index.provider")) config.getString("index.provider") else "lucene" + def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = { + + val indexProviderType = Try { config.getString("index.provider") }.getOrElse("lucene") indexProviderType match { case "lucene" => new LuceneIndexProvider(config) + case "es" => new ESIndexProvider(config) } } @@ -135,186 +128,3 @@ trait IndexProvider { def shutdown(): Unit } - -class LuceneIndexProvider(config: Config) extends IndexProvider { - import IndexProvider._ - import scala.collection.mutable - import scala.collection.JavaConverters._ - import GlobalIndex._ - - val analyzer = new StandardAnalyzer() - val writers = mutable.Map.empty[String, IndexWriter] - val directories = mutable.Map.empty[String, BaseDirectory] - - private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = { - writers.getOrElseUpdate(indexName, { - val dir = directories.getOrElseUpdate(indexName, new RAMDirectory()) - val indexConfig = new IndexWriterConfig(analyzer) - new IndexWriter(dir, indexConfig) - }) - } - - private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): Option[Document] = { - val props = vertex.props.asScala - val exist = props.exists(t => globalIndex.propNamesSet(t._1)) - if (!exist) None - else { - val doc = new Document() - val id = vertex.id.toString - - doc.add(new StringField(vidField, id, Field.Store.YES)) - doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES)) - doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES)) - - props.foreach { case (dim, s2VertexProperty) => - val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO - val field = s2VertexProperty.columnMeta.dataType match { - case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) - case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) - } - doc.add(field) - } - - Option(doc) - } - } - - private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = { - val props = edge.getPropsWithTs().asScala - val exist = props.exists(t => globalIndex.propNamesSet(t._1)) - if (!exist) None - else { - val doc = new Document() - val id = edge.edgeId.toString - - doc.add(new StringField(eidField, id, Field.Store.YES)) - doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES)) - doc.add(new StringField(labelField, edge.label(), Field.Store.YES)) - - props.foreach { case (dim, s2Property) => - val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO - val field = s2Property.labelMeta.dataType match { - case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) - case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) - } - doc.add(field) - } - - Option(doc) - } - } - - override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType) - - globalIndexOptions.map { globalIndex => - val writer = getOrElseCreateIndexWriter(globalIndex.indexName) - - vertices.foreach { vertex => - toDocument(globalIndex, vertex).foreach { doc => - writer.addDocument(doc) - } - } - - writer.commit() - } - - vertices.map(_ => true) - } - - override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) - - globalIndexOptions.map { globalIndex => - val writer = getOrElseCreateIndexWriter(globalIndex.indexName) - - edges.foreach { edge => - toDocument(globalIndex, edge).foreach { doc => - writer.addDocument(doc) - } - } - - writer.commit() - } - - edges.map(_ => true) - } - - override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { - val field = eidField - val ids = new java.util.HashSet[EdgeId] - - GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { globalIndex => - val queryString = buildQueryString(hasContainers) - - try { - val q = new QueryParser(field, analyzer).parse(queryString) - - val reader = DirectoryReader.open(directories(globalIndex.indexName)) - val searcher = new IndexSearcher(reader) - - val docs = searcher.search(q, hitsPerPage) - - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get - ids.add(id); - } - - reader.close() - ids - } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids - } - } - - new util.ArrayList[EdgeId](ids) - } - - override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { - val field = vidField - val ids = new java.util.HashSet[VertexId] - - GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { globalIndex => - val queryString = buildQueryString(hasContainers) - - try { - val q = new QueryParser(field, analyzer).parse(queryString) - - val reader = DirectoryReader.open(directories(globalIndex.indexName)) - val searcher = new IndexSearcher(reader) - - val docs = searcher.search(q, hitsPerPage) - - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get - ids.add(id) - } - - reader.close() - ids - } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids - } - } - - new util.ArrayList[VertexId](ids) - } - - override def shutdown(): Unit = { - writers.foreach { case (_, writer) => writer.close() } - } - - override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) - - override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) - - override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices)) - - override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala new file mode 100644 index 0000000..0670d48 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala @@ -0,0 +1,208 @@ +package org.apache.s2graph.core.index + +import java.util + +import com.typesafe.config.Config +import org.apache.lucene.analysis.standard.StandardAnalyzer +import org.apache.lucene.document.{Document, Field, StringField} +import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} +import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} +import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.store.{BaseDirectory, RAMDirectory} +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} +import org.apache.s2graph.core.io.Conversions +import org.apache.s2graph.core.mysqls.GlobalIndex +import org.apache.s2graph.core.mysqls.GlobalIndex.{eidField, labelField, serviceColumnField, serviceField, vidField} +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.utils.logger +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer +import play.api.libs.json.Json + +import scala.concurrent.Future + + +class LuceneIndexProvider(config: Config) extends IndexProvider { + import IndexProvider._ + import scala.collection.mutable + import scala.collection.JavaConverters._ + import GlobalIndex._ + + val analyzer = new StandardAnalyzer() + val writers = mutable.Map.empty[String, IndexWriter] + val directories = mutable.Map.empty[String, BaseDirectory] + + private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = { + writers.getOrElseUpdate(indexName, { + val dir = directories.getOrElseUpdate(indexName, new RAMDirectory()) + val indexConfig = new IndexWriterConfig(analyzer) + new IndexWriter(dir, indexConfig) + }) + } + + private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): Option[Document] = { + val props = vertex.props.asScala + val filtered = props.filterKeys(globalIndex.propNamesSet) + + if (filtered.isEmpty) None + else { + val doc = new Document() + val id = vertex.id.toString + + doc.add(new StringField(vidField, id, Field.Store.YES)) + doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES)) + doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES)) + + filtered.foreach { case (dim, s2VertexProperty) => + val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO + val field = s2VertexProperty.columnMeta.dataType match { + case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + } + doc.add(field) + } + + Option(doc) + } + } + + private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = { + val props = edge.getPropsWithTs().asScala + val filtered = props.filterKeys(globalIndex.propNamesSet) + + if (filtered.isEmpty) None + else { + val doc = new Document() + val id = edge.edgeId.toString + + doc.add(new StringField(eidField, id, Field.Store.YES)) + doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES)) + doc.add(new StringField(labelField, edge.label(), Field.Store.YES)) + + filtered.foreach { case (dim, s2Property) => + val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO + val field = s2Property.labelMeta.dataType match { + case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) + } + doc.add(field) + } + + Option(doc) + } + } + + override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = { + val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType) + + globalIndexOptions.map { globalIndex => + val writer = getOrElseCreateIndexWriter(globalIndex.indexName) + + vertices.foreach { vertex => + toDocument(globalIndex, vertex).foreach { doc => + writer.addDocument(doc) + } + } + + writer.commit() + } + + vertices.map(_ => true) + } + + override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = { + val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) + + globalIndexOptions.map { globalIndex => + val writer = getOrElseCreateIndexWriter(globalIndex.indexName) + + edges.foreach { edge => + toDocument(globalIndex, edge).foreach { doc => + writer.addDocument(doc) + } + } + + writer.commit() + } + + edges.map(_ => true) + } + + override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { + val field = eidField + val ids = new java.util.HashSet[EdgeId] + + GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { globalIndex => + val queryString = buildQueryString(hasContainers) + + try { + val q = new QueryParser(field, analyzer).parse(queryString) + + val reader = DirectoryReader.open(directories(globalIndex.indexName)) + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get + ids.add(id); + } + + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + ids + } + } + + new util.ArrayList[EdgeId](ids) + } + + override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { + val field = vidField + val ids = new java.util.HashSet[VertexId] + + GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { globalIndex => + val queryString = buildQueryString(hasContainers) + + try { + val q = new QueryParser(field, analyzer).parse(queryString) + + val reader = DirectoryReader.open(directories(globalIndex.indexName)) + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get + ids.add(id) + } + + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + ids + } + } + + new util.ArrayList[VertexId](ids) + } + + override def shutdown(): Unit = { + writers.foreach { case (_, writer) => writer.close() } + } + + override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) + + override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) + + override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices)) + + override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala index a918db5..d756ddf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala @@ -91,5 +91,7 @@ case class GlobalIndex(id: Option[Int], elementType: String, propNames: Seq[String], indexName: String) { + val backendIndexName = indexName + "_" + elementType + val backendIndexNameWithType = backendIndexName + "/test1" lazy val propNamesSet = propNames.toSet } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala index eb00405..b1de0c4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala @@ -59,14 +59,14 @@ object VertexId extends HBaseDeserializable { } def fromString(s: String): VertexId = { - -// val Array(serviceId, columnName, innerValStr) = s.split(S2Vertex.VertexLabelDelimiter) -// val service = Service.findById(serviceId.toInt) -// val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new LabelNotExistException(columnName)) -// val innerId = JSONParser.toInnerVal(innerValStr, column.columnType, column.schemaVersion) -// VertexId(column, innerId) s2VertexIdReads.reads(Json.parse(s)).get } + + def isValid(vertexId: VertexId): Option[VertexId] = { + ServiceColumn.find(vertexId.column.serviceId, vertexId.column.columnName).map { column => + vertexId + } + } } class VertexId (val column: ServiceColumn, val innerId: InnerValLike) extends HBaseSerializable with Comparable[VertexId] { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index c720b9f..08f075d 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.Integrate import com.typesafe.config._ -import org.apache.s2graph.core.mysqls.{Label, Model} +import org.apache.s2graph.core.mysqls.{GlobalIndex, Label, Model} import org.apache.s2graph.core.rest.{RequestParser, RestHandler} import org.apache.s2graph.core.utils.logger import org.apache.s2graph.core._ @@ -98,6 +98,12 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { Management.addVertexProp(testServiceName, testColumnName, key, keyType) } + // vertex type global index. + val globalVertexIndex = management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", Seq("age")) + + // edge type global index. + val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test_weight_time_edge", Seq("weight", "time")) + logger.info("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala index 8294a8f..3871399 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.index import org.apache.s2graph.core.Integrate.IntegrateCommon -import org.apache.s2graph.core.{Query, QueryParam, S2Vertex, Step} +import org.apache.s2graph.core._ import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.apache.tinkerpop.gremlin.process.traversal.{Order, P} import org.apache.s2graph.core.mysqls._ @@ -31,120 +31,122 @@ import org.apache.tinkerpop.gremlin.structure.T import scala.collection.JavaConversions._ class IndexProviderTest extends IntegrateCommon { + import scala.concurrent.ExecutionContext.Implicits.global val indexProvider = IndexProvider(config) val numOfTry = 1 - lazy val gIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test1", Seq("_timestamp", "weight", "time")) - -// test("test vertex write/query") { -// gIndex -// import TestUtil._ -//// Management.addVertexProp(testServiceName, testColumnName, "time", "long") -// -// val testService = Service.findByName(TestUtil.testServiceName).get -// val testColumn = ServiceColumn.find(testService.id.get, TestUtil.testColumnName).get -// val vertexId = graph.newVertexId(testServiceName)(testColumnName)(1L) -// -// val propsWithTs = Map( -//// testColumn.metasInvMap("time") -> InnerVal.withLong(1L, "v4") -// ColumnMeta.timestamp -> InnerVal.withLong(1L, "v4") -// ) -// val otherPropsWithTs = Map( -//// testColumn.metasInvMap("time") -> InnerVal.withLong(2L, "v4") -// ColumnMeta.timestamp -> InnerVal.withLong(2L, "v4") -// ) -// val vertex = graph.newVertex(vertexId) -// S2Vertex.fillPropsWithTs(vertex, propsWithTs) -// -// val otherVertex = graph.newVertex(vertexId) -// S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs) -// -// val numOfOthers = 10 -// val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex) -// -// println(s"[# of vertices]: ${vertices.size}") -// vertices.foreach(v => println(s"[Vertex]: $v")) -// indexProvider.mutateVertices(vertices) -// -// (0 until numOfTry).foreach { ith => -// val hasContainer = new HasContainer("_timestamp", P.eq(Long.box(1))) -// -// var ids = indexProvider.fetchVertexIds(Seq(hasContainer)) -// ids.head shouldBe vertex.id -// -// ids.foreach { id => -// println(s"[Id]: $id") -// } -// } -// } -// -// test("test edge write/query ") { -// import TestUtil._ -// val testLabelName = TestUtil.testLabelName -// val testLabel = Label.findByName(testLabelName).getOrElse(throw new IllegalArgumentException) -// val vertexId = graph.newVertexId(testServiceName)(testColumnName)(1L) -// val otherVertexId = graph.newVertexId(testServiceName)(testColumnName)(2L) -// val vertex = graph.newVertex(vertexId) -// val otherVertex = graph.newVertex(otherVertexId) -// -// val propsWithTs = Map( -// LabelMeta.timestamp -> InnerValLikeWithTs.withLong(1L, 1L, "v4"), -// testLabel.metaPropsInvMap("time") -> InnerValLikeWithTs.withLong(10L, 1L, "v4") -// ) -// val otherPropsWithTs = Map( -// LabelMeta.timestamp -> InnerValLikeWithTs.withLong(2L, 2L, "v4"), -// testLabel.metaPropsInvMap("time") -> InnerValLikeWithTs.withLong(20L, 2L, "v4") -// ) -// val edge = graph.newEdge(vertex, vertex, testLabel, 0, propsWithTs = propsWithTs) -// val otherEdge = graph.newEdge(otherVertex, otherVertex, testLabel, 0, propsWithTs = otherPropsWithTs) -// val numOfOthers = 10 -// val edges = Seq(edge) ++ (0 until numOfOthers).map(_ => otherEdge) -// -// println(s"[# of edges]: ${edges.size}") -// edges.foreach(e => println(s"[Edge]: $e")) -// indexProvider.mutateEdges(edges) -// -// // match -// (0 until numOfTry).foreach { _ => -// val hasContainers = Seq(new HasContainer("time", P.eq(Int.box(10))), -// new HasContainer("_timestamp", P.eq(Int.box(1)))) -// val ids = indexProvider.fetchEdgeIds(hasContainers) -// ids.head shouldBe edge.edgeId -// -// ids.foreach { id => -// println(s"[Id]: $id") -// } -// } -// -// // match and not -// (0 until numOfTry).foreach { _ => -// val hasContainers = Seq(new HasContainer("time", P.eq(Int.box(20))), -// new HasContainer("_timestamp", P.neq(Int.box(1)))) -// val ids = indexProvider.fetchEdgeIds(hasContainers) -// // ids.size shouldBe 0 -// // distinct make ids size to 1 -//// ids.size shouldBe numOfOthers -// -// ids.foreach { id => -// id shouldBe otherEdge.edgeId -// println(s"[Id]: $id") -// } -// } -// -// // range -// (0 until numOfTry).foreach { _ => -// val hasContainers = Seq(new HasContainer("time", -// P.inside(Int.box(0), Int.box(11)))) -// val ids = indexProvider.fetchEdgeIds(hasContainers) -// // ids.size shouldBe 0 -// ids.size shouldBe 1 -// -// ids.foreach { id => -// id shouldBe edge.edgeId -// println(s"[Id]: $id") -// } -// } -// } + test("test vertex write/query") { + import TestUtil._ + + val testService = Service.findByName(TestUtil.testServiceName).get + val testColumn = ServiceColumn.find(testService.id.get, TestUtil.testColumnName).get + val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L) + val indexPropsColumnMeta = testColumn.metasInvMap("age") + + + val propsWithTs = Map( + indexPropsColumnMeta -> InnerVal.withInt(1, "v4") + ) + val otherPropsWithTs = Map( + indexPropsColumnMeta -> InnerVal.withInt(2, "v4") + ) + val vertex = graph.elementBuilder.newVertex(vertexId) + S2Vertex.fillPropsWithTs(vertex, propsWithTs) + + val otherVertex = graph.elementBuilder.newVertex(vertexId) + S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs) + + val numOfOthers = 10 + val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex) + + println(s"[# of vertices]: ${vertices.size}") + vertices.foreach(v => println(s"[Vertex]: $v, ${v.props}")) + indexProvider.mutateVertices(vertices) + + (0 until numOfTry).foreach { ith => + val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1))) + + var ids = indexProvider.fetchVertexIds(Seq(hasContainer)) + ids.head shouldBe vertex.id + + ids.foreach { id => + println(s"[Id]: $id") + } + } + } + + test("test edge write/query ") { + import TestUtil._ + val testLabelName = TestUtil.testLabelName + val testLabel = Label.findByName(testLabelName).getOrElse(throw new IllegalArgumentException) + + val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L) + val otherVertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L) + val vertex = graph.elementBuilder.newVertex(vertexId) + val otherVertex = graph.elementBuilder.newVertex(otherVertexId) + val weightMeta = testLabel.metaPropsInvMap("weight") + val timeMeta = testLabel.metaPropsInvMap("time") + + val propsWithTs = Map( + weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"), + timeMeta -> InnerValLikeWithTs.withLong(10L, 1L, "v4") + ) + val otherPropsWithTs = Map( + weightMeta -> InnerValLikeWithTs.withLong(2L, 2L, "v4"), + timeMeta -> InnerValLikeWithTs.withLong(20L, 2L, "v4") + ) + + val edge = graph.elementBuilder.newEdge(vertex, vertex, testLabel, 0, propsWithTs = propsWithTs) + val otherEdge = graph.elementBuilder.newEdge(otherVertex, otherVertex, testLabel, 0, propsWithTs = otherPropsWithTs) + val numOfOthers = 10 + val edges = Seq(edge) ++ (0 until numOfOthers).map(_ => otherEdge) + + println(s"[# of edges]: ${edges.size}") + edges.foreach(e => println(s"[Edge]: $e")) + indexProvider.mutateEdges(edges) + + // match + (0 until numOfTry).foreach { _ => + val hasContainers = Seq(new HasContainer(timeMeta.name, P.eq(Int.box(10))), + new HasContainer(weightMeta.name, P.eq(Int.box(1)))) + + val ids = indexProvider.fetchEdgeIds(hasContainers) + ids.head shouldBe edge.edgeId + + ids.foreach { id => + println(s"[Id]: $id") + } + } + + // match and not + (0 until numOfTry).foreach { _ => + val hasContainers = Seq(new HasContainer(timeMeta.name, P.eq(Int.box(20))), + new HasContainer(weightMeta.name, P.neq(Int.box(1)))) + val ids = indexProvider.fetchEdgeIds(hasContainers) + // ids.size shouldBe 0 + // distinct make ids size to 1 +// ids.size shouldBe numOfOthers + + ids.foreach { id => + id shouldBe otherEdge.edgeId + println(s"[Id]: $id") + } + } + + // range + (0 until numOfTry).foreach { _ => + val hasContainers = Seq(new HasContainer(timeMeta.name, + P.inside(Int.box(0), Int.box(11)))) + val ids = indexProvider.fetchEdgeIds(hasContainers) + // ids.size shouldBe 0 + ids.size shouldBe 1 + + ids.foreach { id => + id shouldBe edge.edgeId + println(s"[Id]: $id") + } + } + } test("buildQuerySingleString") { // (weight: 34) AND (weight: [0.5 TO *] AND price: 30) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index 3a73d20..bf3291e 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -96,6 +96,7 @@ class S2GraphProvider extends AbstractGraphProvider { val defaultServiceColumn = ServiceColumn.find(defaultService.id.get, S2Graph.DefaultColumnName).getOrElse(throw new IllegalStateException("default column is not initialized.")) val allProps = scala.collection.mutable.Set.empty[Prop] + var knowsProp = Vector( Prop("weight", "0.0", "double"), Prop("data", "-", "string"),
