add IndexProvider.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6065c87b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6065c87b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6065c87b

Branch: refs/heads/master
Commit: 6065c87be004651bf629803d37be2d745c72fc3a
Parents: 1f9693a
Author: DO YUNG YOON <[email protected]>
Authored: Mon Jul 10 19:21:11 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Jul 10 19:21:11 2017 +0900

----------------------------------------------------------------------
 s2core/build.sbt                                |  4 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala | 46 ++++++++++-
 .../org/apache/s2graph/core/S2Vertex.scala      | 85 ++++++++++----------
 .../s2graph/core/index/IndexProvider.scala      | 81 +++++++++++++++++++
 4 files changed, 170 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 9cfc966..8033581 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -46,7 +46,9 @@ libraryDependencies ++= Seq(
   "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test",
   "org.scalatest" %% "scalatest" % "2.2.4" % "test",
   "org.specs2" %% "specs2-core" % specs2Version % "test",
-  "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion 
+  "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
+  "org.apache.lucene" % "lucene-core" % "6.6.0",
+  "org.apache.lucene" % "lucene-queryparser" % "6.6.0"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 342b841..a154419 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -28,6 +28,7 @@ import org.apache.commons.configuration.{BaseConfiguration, 
Configuration}
 import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, 
LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.features.S2GraphVariables
+import org.apache.s2graph.core.index.{IndexProvider, LuceneIndexProvider}
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.{SKeyValue, Storage}
@@ -35,6 +36,7 @@ import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
 import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
 import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
 import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, 
Io, Mapper}
 import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, 
Transaction, Vertex}
@@ -945,6 +947,7 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     (k, v) = (entry.getKey, entry.getValue)
   } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
 
+  val indexProvider = IndexProvider.apply(config)
 
   def getStorage(service: Service): Storage[_, _] = {
     storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
@@ -1820,9 +1823,6 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     addVertexInner(vertex)
   }
 
-
-
-
   def addVertex(id: VertexId,
                 ts: Long = System.currentTimeMillis(),
                 props: S2Vertex.Props = S2Vertex.EmptyProps,
@@ -1844,6 +1844,46 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     Await.result(future, WaitTimeout)
   }
 
+  def addEdge(srcVertex: S2Vertex, labelName: String, tgtVertex: Vertex, kvs: 
AnyRef*): Edge = {
+    val containsId = kvs.contains(T.id)
+
+    tgtVertex match {
+      case otherV: S2Vertex =>
+        if (!features().edge().supportsUserSuppliedIds() && containsId) {
+          throw Exceptions.userSuppliedIdsNotSupported()
+        }
+
+        val props = S2Property.kvsToProps(kvs)
+
+        props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
+
+        //TODO: direction, operation, _timestamp need to be reserved property 
key.
+
+        try {
+          val direction = props.get("direction").getOrElse("out").toString
+          val ts = 
props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
+          val operation = 
props.get("operation").map(_.toString).getOrElse("insert")
+          val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
+          val dir = GraphUtil.toDir(direction).getOrElse(throw new 
RuntimeException(s"$direction is not supported."))
+          val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+          val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+          val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
+
+          val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = 
ts, propsWithTs = propsWithTs)
+
+          indexProvider.mutateEdges(Seq(edge))
+
+          val future = mutateEdges(Seq(edge), withWait = true)
+          Await.ready(future, WaitTimeout)
+          edge
+        } catch {
+          case e: LabelNotExistException => throw new 
java.lang.IllegalArgumentException(e)
+        }
+      case null => throw new java.lang.IllegalArgumentException
+      case _ => throw new RuntimeException("only S2Graph vertex can be used.")
+    }
+  }
+
   override def close(): Unit = {
     shutdown()
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index d1e2eda..d485a01 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -200,48 +200,49 @@ case class S2Vertex(graph: S2Graph,
   }
 
   override def addEdge(labelName: String, vertex: Vertex, kvs: AnyRef*): Edge 
= {
-    val containsId = kvs.contains(T.id)
-    vertex match {
-      case otherV: S2Vertex =>
-        if (!graph.features().edge().supportsUserSuppliedIds() && containsId) {
-          throw Exceptions.userSuppliedIdsNotSupported()
-        }
-
-        val props = S2Property.kvsToProps(kvs)
-
-        props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
-
-        //TODO: direction, operation, _timestamp need to be reserved property 
key.
-
-        try {
-          val direction = props.get("direction").getOrElse("out").toString
-          val ts = 
props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
-          val operation = 
props.get("operation").map(_.toString).getOrElse("insert")
-          val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
-          val dir = GraphUtil.toDir(direction).getOrElse(throw new 
RuntimeException(s"$direction is not supported."))
-          val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
-          val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
-          val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
-
-          val edge = graph.newEdge(this, otherV, label, dir, op = op, version 
= ts, propsWithTs = propsWithTs)
-//          //TODO: return type of mutateEdges can contains information if 
snapshot edge already exist.
-//          // instead call checkEdges, we can exploit this feature once we 
refactor return type.
-//          implicit val ec = graph.ec
-//          val future = graph.checkEdges(Seq(edge)).flatMap { stepResult =>
-//            if (stepResult.edgeWithScores.nonEmpty)
-//              Future.failed(throw 
Graph.Exceptions.edgeWithIdAlreadyExists(edge.id()))
-//            else
-//              graph.mutateEdges(Seq(edge), withWait = true)
-//          }
-          val future = graph.mutateEdges(Seq(edge), withWait = true)
-          Await.ready(future, graph.WaitTimeout)
-          edge
-        } catch {
-          case e: LabelNotExistException => throw new 
java.lang.IllegalArgumentException(e)
-         }
-      case null => throw new java.lang.IllegalArgumentException
-      case _ => throw new RuntimeException("only S2Graph vertex can be used.")
-    }
+    graph.addEdge(this, labelName, vertex, kvs: _*)
+//    val containsId = kvs.contains(T.id)
+//    vertex match {
+//      case otherV: S2Vertex =>
+//        if (!graph.features().edge().supportsUserSuppliedIds() && 
containsId) {
+//          throw Exceptions.userSuppliedIdsNotSupported()
+//        }
+//
+//        val props = S2Property.kvsToProps(kvs)
+//
+//        props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
+//
+//        //TODO: direction, operation, _timestamp need to be reserved 
property key.
+//
+//        try {
+//          val direction = props.get("direction").getOrElse("out").toString
+//          val ts = 
props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
+//          val operation = 
props.get("operation").map(_.toString).getOrElse("insert")
+//          val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
+//          val dir = GraphUtil.toDir(direction).getOrElse(throw new 
RuntimeException(s"$direction is not supported."))
+//          val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+//          val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+//          val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
+//
+//          val edge = graph.newEdge(this, otherV, label, dir, op = op, 
version = ts, propsWithTs = propsWithTs)
+////          //TODO: return type of mutateEdges can contains information if 
snapshot edge already exist.
+////          // instead call checkEdges, we can exploit this feature once we 
refactor return type.
+////          implicit val ec = graph.ec
+////          val future = graph.checkEdges(Seq(edge)).flatMap { stepResult =>
+////            if (stepResult.edgeWithScores.nonEmpty)
+////              Future.failed(throw 
Graph.Exceptions.edgeWithIdAlreadyExists(edge.id()))
+////            else
+////              graph.mutateEdges(Seq(edge), withWait = true)
+////          }
+//          val future = graph.mutateEdges(Seq(edge), withWait = true)
+//          Await.ready(future, graph.WaitTimeout)
+//          edge
+//        } catch {
+//          case e: LabelNotExistException => throw new 
java.lang.IllegalArgumentException(e)
+//         }
+//      case null => throw new java.lang.IllegalArgumentException
+//      case _ => throw new RuntimeException("only S2Graph vertex can be 
used.")
+//    }
   }
 
   override def property[V](key: String): VertexProperty[V] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6065c87b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
new file mode 100644
index 0000000..a1c8c40
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -0,0 +1,81 @@
+package org.apache.s2graph.core.index
+
+import com.typesafe.config.Config
+import org.apache.lucene.analysis.standard.StandardAnalyzer
+import org.apache.lucene.document.{Document, Field, StringField, TextField}
+import org.apache.lucene.index.{DirectoryReader, IndexWriter, 
IndexWriterConfig}
+import org.apache.lucene.queryparser.classic.QueryParser
+import org.apache.lucene.search.IndexSearcher
+import org.apache.lucene.store.RAMDirectory
+import org.apache.s2graph.core.io.Conversions
+import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.{EdgeId, S2Edge}
+import org.apache.s2graph.core.types.InnerValLike
+import play.api.libs.json.Json
+
+object IndexProvider {
+  val edgeIdField = "_edgeId_"
+  def apply(config: Config): IndexProvider = {
+    val indexProviderType =
+      if (config.hasPath("index.provider")) config.getString("index.provider") 
else "lucene"
+
+    indexProviderType match {
+      case "lucene" => new LuceneIndexProvider(config)
+    }
+  }
+}
+
+trait IndexProvider {
+  //TODO: Seq nee do be changed into stream
+  def fetchEdges(indexProps: Seq[(ColumnMeta, InnerValLike)]): Seq[EdgeId]
+
+  def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean]
+
+  def shutdown(): Unit
+}
+
+class LuceneIndexProvider(config: Config) extends IndexProvider {
+  import IndexProvider._
+
+  val analyzer = new StandardAnalyzer()
+  val directory = new RAMDirectory()
+  val indexConfig = new IndexWriterConfig(analyzer)
+  val reader = DirectoryReader.open(directory)
+  val writer = new IndexWriter(directory, indexConfig)
+
+  override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = {
+    edges.map { edge =>
+      val doc = new Document()
+      val edgeIdString = edge.edgeId.toString
+      doc.add(new StringField(edgeIdField, edgeIdString, Field.Store.YES))
+
+      edge.properties.foreach { case (dim, value) =>
+        doc.add(new TextField(dim, value.toString, Field.Store.YES))
+      }
+      writer.addDocument(doc)
+    }
+
+    edges.map(_ => true)
+  }
+
+  override def fetchEdges(indexProps: Seq[(ColumnMeta, InnerValLike)]): 
Seq[EdgeId] = {
+    val queryStr = indexProps.map { case (columnMeta, value) =>
+       columnMeta.name + ": " + value.toString()
+    }.mkString(" ")
+
+    val q = new QueryParser(edgeIdField, analyzer).parse(queryStr)
+    val hitsPerPage = 10
+    val searcher = new IndexSearcher(reader)
+
+    val docs = searcher.search(q, hitsPerPage)
+    docs.scoreDocs.map { scoreDoc =>
+      val document = searcher.doc(scoreDoc.doc)
+      
Conversions.s2EdgeIdReads.reads(Json.parse(document.get(edgeIdField))).get
+    }
+  }
+
+  override def shutdown(): Unit = {
+    writer.close()
+    reader.close()
+  }
+}
\ No newline at end of file

Reply via email to