Change insert to update and use VertexId,EdgeId as document id for global index.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f164d1d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f164d1d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f164d1d0

Branch: refs/heads/master
Commit: f164d1d0984d53dfca7f5a0c87355fbe7b7942cf
Parents: 63012f8
Author: DO YUNG YOON <[email protected]>
Authored: Mon Feb 12 16:08:16 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Feb 12 16:08:16 2018 +0900

----------------------------------------------------------------------
 .../s2graph/core/index/ESIndexProvider.scala    | 16 +++++-------
 .../s2graph/core/index/IndexProvider.scala      |  2 ++
 .../core/index/LuceneIndexProvider.scala        | 18 ++++++++-----
 .../s2graph/core/mysqls/GlobalIndex.scala       |  5 +++-
 .../s2graph/core/index/IndexProviderTest.scala  | 27 +++++++++++++-------
 5 files changed, 42 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index cb74cf1..6499874 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -2,7 +2,7 @@ package org.apache.s2graph.core.index
 
 import java.util
 
-import com.sksamuel.elastic4s.ElasticsearchClientUri
+import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
 import com.sksamuel.elastic4s.http.ElasticDsl._
 import com.sksamuel.elastic4s.http.HttpClient
 import com.typesafe.config.Config
@@ -16,6 +16,7 @@ import play.api.libs.json.Json
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.Try
 
 class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends 
IndexProvider {
   import GlobalIndex._
@@ -25,11 +26,8 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
 
   implicit val executor = ec
 
-  val esClientUri = "localhost"
-//  //    config.getString("es.index.provider.client.uri")
+  val esClientUri = 
Try(config.getString("es.index.provider.client.uri")).getOrElse("localhost")
   val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200))
-//  val node = LocalNode("elasticsearch", "./es")
-//  val client = node.http(true)
 
   val WaitTime = Duration("60 seconds")
 
@@ -87,7 +85,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
   override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Future[Seq[Boolean]] = {
     val bulkRequests = vertices.flatMap { vertex =>
         toFields(vertex, forceToIndex).toSeq.map { fields =>
-          indexInto(GlobalIndex.TableName).fields(fields)
+          update(vertex.id.toString()).in(new 
IndexAndType(GlobalIndex.VertexIndexName, 
GlobalIndex.TypeName)).docAsUpsert(fields)
         }
       }
 
@@ -115,7 +113,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
   override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean 
= false): Future[Seq[Boolean]] = {
     val bulkRequests = edges.flatMap { edge =>
       toFields(edge, forceToIndex).toSeq.map { fields =>
-        indexInto(GlobalIndex.TableName).fields(fields)
+        update(edge.edgeId.toString()).in(new 
IndexAndType(GlobalIndex.EdgeIndexName, 
GlobalIndex.TypeName)).docAsUpsert(fields)
       }
     }
 
@@ -142,7 +140,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     val queryString = buildQueryString(hasContainers)
 
     client.execute {
-      search(GlobalIndex.TableName).query(queryString)
+      search(GlobalIndex.EdgeIndexName).query(queryString)
     }.map { ret =>
       ret match {
         case Left(failure) =>
@@ -172,7 +170,7 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     val queryString = buildQueryString(hasContainers)
 
     client.execute {
-      search(GlobalIndex.TableName).query(queryString)
+      search(GlobalIndex.VertexIndexName).query(queryString)
     }.map { ret =>
       ret match {
         case Left(failure) =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index 864209f..ae632df 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -35,7 +35,9 @@ import scala.util.Try
 
 object IndexProvider {
   import GlobalIndex._
+  //TODO: Fix Me
   val hitsPerPage = 100000
+  val IdField = "id"
 
   def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = {
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index b750499..8cafb89 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -5,7 +5,7 @@ import java.util
 import com.typesafe.config.Config
 import org.apache.lucene.analysis.standard.StandardAnalyzer
 import org.apache.lucene.document.{Document, Field, StringField}
-import org.apache.lucene.index.{DirectoryReader, IndexWriter, 
IndexWriterConfig}
+import org.apache.lucene.index.{DirectoryReader, IndexWriter, 
IndexWriterConfig, Term}
 import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
 import org.apache.lucene.search.IndexSearcher
 import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
@@ -100,11 +100,13 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     Future.successful(mutateEdges(edges, forceToIndex))
 
   override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Seq[Boolean] = {
-    val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName)
+    val writer = getOrElseCreateIndexWriter(GlobalIndex.VertexIndexName)
 
     vertices.foreach { vertex =>
       toDocument(vertex, forceToIndex).foreach { doc =>
-        writer.addDocument(doc)
+        val vId = vertex.id.toString()
+
+        writer.updateDocument(new Term(IdField, vId), doc)
       }
     }
 
@@ -117,11 +119,13 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     Future.successful(mutateVertices(vertices, forceToIndex))
 
   override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = 
false): Seq[Boolean] = {
-    val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName)
+    val writer = getOrElseCreateIndexWriter(GlobalIndex.EdgeIndexName)
 
     edges.foreach { edge =>
       toDocument(edge, forceToIndex).foreach { doc =>
-        writer.addDocument(doc)
+        val eId = edge.edgeId.toString
+
+        writer.updateDocument(new Term(IdField, eId), doc)
       }
     }
 
@@ -139,7 +143,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
 
-      val reader = DirectoryReader.open(directories(GlobalIndex.TableName))
+      val reader = DirectoryReader.open(directories(GlobalIndex.EdgeIndexName))
       val searcher = new IndexSearcher(reader)
 
       val docs = searcher.search(q, hitsPerPage)
@@ -169,7 +173,7 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
 
-      val reader = DirectoryReader.open(directories(GlobalIndex.TableName))
+      val reader = 
DirectoryReader.open(directories(GlobalIndex.VertexIndexName))
       val searcher = new IndexSearcher(reader)
 
       val docs = searcher.search(q, hitsPerPage)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
index d756ddf..501a964 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
@@ -33,7 +33,10 @@ object GlobalIndex extends Model[GlobalIndex] {
   val VertexType = "vertex"
   val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, 
serviceColumnField)
 
-  val TableName = "global_indices"
+//  val IndexName = "global_indices"
+  val VertexIndexName = "global_vertex_index"
+  val EdgeIndexName = "global_edge_index"
+  val TypeName = "test"
 
   def apply(rs: WrappedResultSet): GlobalIndex = {
     GlobalIndex(rs.intOpt("id"),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index 9ed12e6..4889412 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -43,7 +43,6 @@ class IndexProviderTest extends IntegrateCommon {
     val testColumn = ServiceColumn.find(testService.id.get, 
TestUtil.testColumnName).get
     val vertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L)
     val indexPropsColumnMeta = testColumn.metasInvMap("age")
-    ColumnMeta.updateStoreInGlobalIndex(indexPropsColumnMeta.id.get, 
storeInGlobalIndex = true)
 
     val propsWithTs = Map(
       indexPropsColumnMeta -> InnerVal.withInt(1, "v4")
@@ -57,12 +56,22 @@ class IndexProviderTest extends IntegrateCommon {
     val otherVertex = graph.elementBuilder.newVertex(vertexId)
     S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs)
 
-    val numOfOthers = 10
-    val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex)
+    val numOfOthers = 20
+    val vertices = Seq(vertex) ++ (10 until numOfOthers).map { ith =>
+      val vertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(ith)
+
+      val v = graph.elementBuilder.newVertex(vertexId)
+      S2Vertex.fillPropsWithTs(v, otherPropsWithTs)
+
+      v
+    }
 
     println(s"[# of vertices]: ${vertices.size}")
     vertices.foreach(v => println(s"[Vertex]: $v, ${v.props}"))
-    indexProvider.mutateVertices(vertices)
+    indexProvider.mutateVertices(vertices, forceToIndex = true)
+
+    // enough time for elastic search to persist docs.
+    Thread.sleep(1000)
 
     (0 until numOfTry).foreach { ith =>
       val hasContainer = new HasContainer(indexPropsColumnMeta.name, 
P.eq(Long.box(1)))
@@ -85,11 +94,8 @@ class IndexProviderTest extends IntegrateCommon {
     val otherVertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L)
     val vertex = graph.elementBuilder.newVertex(vertexId)
     val otherVertex = graph.elementBuilder.newVertex(otherVertexId)
-    val weightMeta = 
testLabel.metaPropsInvMap("weight").copy(storeInGlobalIndex = true)
-    val timeMeta = testLabel.metaPropsInvMap("time").copy(storeInGlobalIndex = 
true)
-
-    LabelMeta.updateStoreInGlobalIndex(weightMeta.id.get, storeInGlobalIndex = 
true)
-    LabelMeta.updateStoreInGlobalIndex(timeMeta.id.get, storeInGlobalIndex = 
true)
+    val weightMeta = testLabel.metaPropsInvMap("weight")
+    val timeMeta = testLabel.metaPropsInvMap("time")
 
     val propsWithTs = Map(
       weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"),
@@ -109,6 +115,9 @@ class IndexProviderTest extends IntegrateCommon {
     edges.foreach(e => println(s"[Edge]: $e"))
     indexProvider.mutateEdges(edges, forceToIndex = true)
 
+    // enough time for elastic search to persist docs.
+    Thread.sleep(1000)
+
     // match
     (0 until numOfTry).foreach { _ =>
       val hasContainers = Seq(new HasContainer(timeMeta.name, 
P.eq(Int.box(10))),

Reply via email to