Change insert to update and use VertexId,EdgeId as document id for global index.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f164d1d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f164d1d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f164d1d0 Branch: refs/heads/master Commit: f164d1d0984d53dfca7f5a0c87355fbe7b7942cf Parents: 63012f8 Author: DO YUNG YOON <[email protected]> Authored: Mon Feb 12 16:08:16 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Feb 12 16:08:16 2018 +0900 ---------------------------------------------------------------------- .../s2graph/core/index/ESIndexProvider.scala | 16 +++++------- .../s2graph/core/index/IndexProvider.scala | 2 ++ .../core/index/LuceneIndexProvider.scala | 18 ++++++++----- .../s2graph/core/mysqls/GlobalIndex.scala | 5 +++- .../s2graph/core/index/IndexProviderTest.scala | 27 +++++++++++++------- 5 files changed, 42 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala index cb74cf1..6499874 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala @@ -2,7 +2,7 @@ package org.apache.s2graph.core.index import java.util -import com.sksamuel.elastic4s.ElasticsearchClientUri +import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType} import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.http.HttpClient import com.typesafe.config.Config @@ -16,6 +16,7 @@ import play.api.libs.json.Json import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.Try class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider { import GlobalIndex._ @@ -25,11 +26,8 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind implicit val executor = ec - val esClientUri = "localhost" -// // config.getString("es.index.provider.client.uri") + val esClientUri = Try(config.getString("es.index.provider.client.uri")).getOrElse("localhost") val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200)) -// val node = LocalNode("elasticsearch", "./es") -// val client = node.http(true) val WaitTime = Duration("60 seconds") @@ -87,7 +85,7 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { val bulkRequests = vertices.flatMap { vertex => toFields(vertex, forceToIndex).toSeq.map { fields => - indexInto(GlobalIndex.TableName).fields(fields) + update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields) } } @@ -115,7 +113,7 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { val bulkRequests = edges.flatMap { edge => toFields(edge, forceToIndex).toSeq.map { fields => - indexInto(GlobalIndex.TableName).fields(fields) + update(edge.edgeId.toString()).in(new IndexAndType(GlobalIndex.EdgeIndexName, GlobalIndex.TypeName)).docAsUpsert(fields) } } @@ -142,7 +140,7 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind val queryString = buildQueryString(hasContainers) client.execute { - search(GlobalIndex.TableName).query(queryString) + search(GlobalIndex.EdgeIndexName).query(queryString) }.map { ret => ret match { case Left(failure) => @@ -172,7 +170,7 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind val queryString = buildQueryString(hasContainers) client.execute { - search(GlobalIndex.TableName).query(queryString) + search(GlobalIndex.VertexIndexName).query(queryString) }.map { ret => ret match { case Left(failure) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index 864209f..ae632df 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -35,7 +35,9 @@ import scala.util.Try object IndexProvider { import GlobalIndex._ + //TODO: Fix Me val hitsPerPage = 100000 + val IdField = "id" def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala index b750499..8cafb89 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala @@ -5,7 +5,7 @@ import java.util import com.typesafe.config.Config import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.document.{Document, Field, StringField} -import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} +import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term} import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} import org.apache.lucene.search.IndexSearcher import org.apache.lucene.store.{BaseDirectory, RAMDirectory} @@ -100,11 +100,13 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { Future.successful(mutateEdges(edges, forceToIndex)) override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = { - val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName) + val writer = getOrElseCreateIndexWriter(GlobalIndex.VertexIndexName) vertices.foreach { vertex => toDocument(vertex, forceToIndex).foreach { doc => - writer.addDocument(doc) + val vId = vertex.id.toString() + + writer.updateDocument(new Term(IdField, vId), doc) } } @@ -117,11 +119,13 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { Future.successful(mutateVertices(vertices, forceToIndex)) override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = { - val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName) + val writer = getOrElseCreateIndexWriter(GlobalIndex.EdgeIndexName) edges.foreach { edge => toDocument(edge, forceToIndex).foreach { doc => - writer.addDocument(doc) + val eId = edge.edgeId.toString + + writer.updateDocument(new Term(IdField, eId), doc) } } @@ -139,7 +143,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { try { val q = new QueryParser(field, analyzer).parse(queryString) - val reader = DirectoryReader.open(directories(GlobalIndex.TableName)) + val reader = DirectoryReader.open(directories(GlobalIndex.EdgeIndexName)) val searcher = new IndexSearcher(reader) val docs = searcher.search(q, hitsPerPage) @@ -169,7 +173,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { try { val q = new QueryParser(field, analyzer).parse(queryString) - val reader = DirectoryReader.open(directories(GlobalIndex.TableName)) + val reader = DirectoryReader.open(directories(GlobalIndex.VertexIndexName)) val searcher = new IndexSearcher(reader) val docs = searcher.search(q, hitsPerPage) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala index d756ddf..501a964 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala @@ -33,7 +33,10 @@ object GlobalIndex extends Model[GlobalIndex] { val VertexType = "vertex" val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField) - val TableName = "global_indices" +// val IndexName = "global_indices" + val VertexIndexName = "global_vertex_index" + val EdgeIndexName = "global_edge_index" + val TypeName = "test" def apply(rs: WrappedResultSet): GlobalIndex = { GlobalIndex(rs.intOpt("id"), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f164d1d0/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala index 9ed12e6..4889412 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala @@ -43,7 +43,6 @@ class IndexProviderTest extends IntegrateCommon { val testColumn = ServiceColumn.find(testService.id.get, TestUtil.testColumnName).get val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L) val indexPropsColumnMeta = testColumn.metasInvMap("age") - ColumnMeta.updateStoreInGlobalIndex(indexPropsColumnMeta.id.get, storeInGlobalIndex = true) val propsWithTs = Map( indexPropsColumnMeta -> InnerVal.withInt(1, "v4") @@ -57,12 +56,22 @@ class IndexProviderTest extends IntegrateCommon { val otherVertex = graph.elementBuilder.newVertex(vertexId) S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs) - val numOfOthers = 10 - val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex) + val numOfOthers = 20 + val vertices = Seq(vertex) ++ (10 until numOfOthers).map { ith => + val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(ith) + + val v = graph.elementBuilder.newVertex(vertexId) + S2Vertex.fillPropsWithTs(v, otherPropsWithTs) + + v + } println(s"[# of vertices]: ${vertices.size}") vertices.foreach(v => println(s"[Vertex]: $v, ${v.props}")) - indexProvider.mutateVertices(vertices) + indexProvider.mutateVertices(vertices, forceToIndex = true) + + // enough time for elastic search to persist docs. + Thread.sleep(1000) (0 until numOfTry).foreach { ith => val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1))) @@ -85,11 +94,8 @@ class IndexProviderTest extends IntegrateCommon { val otherVertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L) val vertex = graph.elementBuilder.newVertex(vertexId) val otherVertex = graph.elementBuilder.newVertex(otherVertexId) - val weightMeta = testLabel.metaPropsInvMap("weight").copy(storeInGlobalIndex = true) - val timeMeta = testLabel.metaPropsInvMap("time").copy(storeInGlobalIndex = true) - - LabelMeta.updateStoreInGlobalIndex(weightMeta.id.get, storeInGlobalIndex = true) - LabelMeta.updateStoreInGlobalIndex(timeMeta.id.get, storeInGlobalIndex = true) + val weightMeta = testLabel.metaPropsInvMap("weight") + val timeMeta = testLabel.metaPropsInvMap("time") val propsWithTs = Map( weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"), @@ -109,6 +115,9 @@ class IndexProviderTest extends IntegrateCommon { edges.foreach(e => println(s"[Edge]: $e")) indexProvider.mutateEdges(edges, forceToIndex = true) + // enough time for elastic search to persist docs. + Thread.sleep(1000) + // match (0 until numOfTry).foreach { _ => val hasContainers = Seq(new HasContainer(timeMeta.name, P.eq(Int.box(10))),
