add IndexProvider.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6065c87b Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6065c87b Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6065c87b Branch: refs/heads/master Commit: 6065c87be004651bf629803d37be2d745c72fc3a Parents: 1f9693a Author: DO YUNG YOON <[email protected]> Authored: Mon Jul 10 19:21:11 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Jul 10 19:21:11 2017 +0900 ---------------------------------------------------------------------- s2core/build.sbt | 4 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 46 ++++++++++- .../org/apache/s2graph/core/S2Vertex.scala | 85 ++++++++++---------- .../s2graph/core/index/IndexProvider.scala | 81 +++++++++++++++++++ 4 files changed, 170 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 9cfc966..8033581 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -46,7 +46,9 @@ libraryDependencies ++= Seq( "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test", "org.scalatest" %% "scalatest" % "2.2.4" % "test", "org.specs2" %% "specs2-core" % specs2Version % "test", - "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion , + "org.apache.lucene" % "lucene-core" % "6.6.0", + "org.apache.lucene" % "lucene-queryparser" % "6.6.0" ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 342b841..a154419 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -28,6 +28,7 @@ import org.apache.commons.configuration.{BaseConfiguration, Configuration} import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException} import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.features.S2GraphVariables +import org.apache.s2graph.core.index.{IndexProvider, LuceneIndexProvider} import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.storage.{SKeyValue, Storage} @@ -35,6 +36,7 @@ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} import org.apache.tinkerpop.gremlin.process.computer.GraphComputer import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables} import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper} import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex} @@ -945,6 +947,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph (k, v) = (entry.getKey, entry.getValue) } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") + val indexProvider = IndexProvider.apply(config) def getStorage(service: Service): Storage[_, _] = { storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) @@ -1820,9 +1823,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph addVertexInner(vertex) } - - - def addVertex(id: VertexId, ts: Long = System.currentTimeMillis(), props: S2Vertex.Props = S2Vertex.EmptyProps, @@ -1844,6 +1844,46 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph Await.result(future, WaitTimeout) } + def addEdge(srcVertex: S2Vertex, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = { + val containsId = kvs.contains(T.id) + + tgtVertex match { + case otherV: S2Vertex => + if (!features().edge().supportsUserSuppliedIds() && containsId) { + throw Exceptions.userSuppliedIdsNotSupported() + } + + val props = S2Property.kvsToProps(kvs) + + props.foreach { case (k, v) => S2Property.assertValidProp(k, v) } + + //TODO: direction, operation, _timestamp need to be reserved property key. + + try { + val direction = props.get("direction").getOrElse("out").toString + val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis()) + val operation = props.get("operation").map(_.toString).getOrElse("insert") + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) + + indexProvider.mutateEdges(Seq(edge)) + + val future = mutateEdges(Seq(edge), withWait = true) + Await.ready(future, WaitTimeout) + edge + } catch { + case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e) + } + case null => throw new java.lang.IllegalArgumentException + case _ => throw new RuntimeException("only S2Graph vertex can be used.") + } + } + override def close(): Unit = { shutdown() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala index d1e2eda..d485a01 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala @@ -200,48 +200,49 @@ case class S2Vertex(graph: S2Graph, } override def addEdge(labelName: String, vertex: Vertex, kvs: AnyRef*): Edge = { - val containsId = kvs.contains(T.id) - vertex match { - case otherV: S2Vertex => - if (!graph.features().edge().supportsUserSuppliedIds() && containsId) { - throw Exceptions.userSuppliedIdsNotSupported() - } - - val props = S2Property.kvsToProps(kvs) - - props.foreach { case (k, v) => S2Property.assertValidProp(k, v) } - - //TODO: direction, operation, _timestamp need to be reserved property key. - - try { - val direction = props.get("direction").getOrElse("out").toString - val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis()) - val operation = props.get("operation").map(_.toString).getOrElse("insert") - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) - val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) - val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - - val edge = graph.newEdge(this, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) -// //TODO: return type of mutateEdges can contains information if snapshot edge already exist. -// // instead call checkEdges, we can exploit this feature once we refactor return type. -// implicit val ec = graph.ec -// val future = graph.checkEdges(Seq(edge)).flatMap { stepResult => -// if (stepResult.edgeWithScores.nonEmpty) -// Future.failed(throw Graph.Exceptions.edgeWithIdAlreadyExists(edge.id())) -// else -// graph.mutateEdges(Seq(edge), withWait = true) -// } - val future = graph.mutateEdges(Seq(edge), withWait = true) - Await.ready(future, graph.WaitTimeout) - edge - } catch { - case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e) - } - case null => throw new java.lang.IllegalArgumentException - case _ => throw new RuntimeException("only S2Graph vertex can be used.") - } + graph.addEdge(this, labelName, vertex, kvs: _*) +// val containsId = kvs.contains(T.id) +// vertex match { +// case otherV: S2Vertex => +// if (!graph.features().edge().supportsUserSuppliedIds() && containsId) { +// throw Exceptions.userSuppliedIdsNotSupported() +// } +// +// val props = S2Property.kvsToProps(kvs) +// +// props.foreach { case (k, v) => S2Property.assertValidProp(k, v) } +// +// //TODO: direction, operation, _timestamp need to be reserved property key. +// +// try { +// val direction = props.get("direction").getOrElse("out").toString +// val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis()) +// val operation = props.get("operation").map(_.toString).getOrElse("insert") +// val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) +// val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) +// val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) +// val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) +// val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) +// +// val edge = graph.newEdge(this, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) +//// //TODO: return type of mutateEdges can contains information if snapshot edge already exist. +//// // instead call checkEdges, we can exploit this feature once we refactor return type. +//// implicit val ec = graph.ec +//// val future = graph.checkEdges(Seq(edge)).flatMap { stepResult => +//// if (stepResult.edgeWithScores.nonEmpty) +//// Future.failed(throw Graph.Exceptions.edgeWithIdAlreadyExists(edge.id())) +//// else +//// graph.mutateEdges(Seq(edge), withWait = true) +//// } +// val future = graph.mutateEdges(Seq(edge), withWait = true) +// Await.ready(future, graph.WaitTimeout) +// edge +// } catch { +// case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e) +// } +// case null => throw new java.lang.IllegalArgumentException +// case _ => throw new RuntimeException("only S2Graph vertex can be used.") +// } } override def property[V](key: String): VertexProperty[V] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala new file mode 100644 index 0000000..a1c8c40 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -0,0 +1,81 @@ +package org.apache.s2graph.core.index + +import com.typesafe.config.Config +import org.apache.lucene.analysis.standard.StandardAnalyzer +import org.apache.lucene.document.{Document, Field, StringField, TextField} +import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} +import org.apache.lucene.queryparser.classic.QueryParser +import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.store.RAMDirectory +import org.apache.s2graph.core.io.Conversions +import org.apache.s2graph.core.mysqls.ColumnMeta +import org.apache.s2graph.core.{EdgeId, S2Edge} +import org.apache.s2graph.core.types.InnerValLike +import play.api.libs.json.Json + +object IndexProvider { + val edgeIdField = "_edgeId_" + def apply(config: Config): IndexProvider = { + val indexProviderType = + if (config.hasPath("index.provider")) config.getString("index.provider") else "lucene" + + indexProviderType match { + case "lucene" => new LuceneIndexProvider(config) + } + } +} + +trait IndexProvider { + //TODO: Seq nee do be changed into stream + def fetchEdges(indexProps: Seq[(ColumnMeta, InnerValLike)]): Seq[EdgeId] + + def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] + + def shutdown(): Unit +} + +class LuceneIndexProvider(config: Config) extends IndexProvider { + import IndexProvider._ + + val analyzer = new StandardAnalyzer() + val directory = new RAMDirectory() + val indexConfig = new IndexWriterConfig(analyzer) + val reader = DirectoryReader.open(directory) + val writer = new IndexWriter(directory, indexConfig) + + override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = { + edges.map { edge => + val doc = new Document() + val edgeIdString = edge.edgeId.toString + doc.add(new StringField(edgeIdField, edgeIdString, Field.Store.YES)) + + edge.properties.foreach { case (dim, value) => + doc.add(new TextField(dim, value.toString, Field.Store.YES)) + } + writer.addDocument(doc) + } + + edges.map(_ => true) + } + + override def fetchEdges(indexProps: Seq[(ColumnMeta, InnerValLike)]): Seq[EdgeId] = { + val queryStr = indexProps.map { case (columnMeta, value) => + columnMeta.name + ": " + value.toString() + }.mkString(" ") + + val q = new QueryParser(edgeIdField, analyzer).parse(queryStr) + val hitsPerPage = 10 + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + docs.scoreDocs.map { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + Conversions.s2EdgeIdReads.reads(Json.parse(document.get(edgeIdField))).get + } + } + + override def shutdown(): Unit = { + writer.close() + reader.close() + } +} \ No newline at end of file
