Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 3cc98da8a -> 8f9214e82


[S2GRAPH-175]: Provide Elastic Search Index Provider.
- add ESIndexProvider.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/939e5f3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/939e5f3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/939e5f3b

Branch: refs/heads/master
Commit: 939e5f3b5b92d11c47e8b60e824464378530cd2a
Parents: 3cc98da
Author: DO YUNG YOON <[email protected]>
Authored: Tue Jan 30 11:58:05 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Jan 30 11:58:24 2018 +0900

----------------------------------------------------------------------
 project/Common.scala                            |   2 +
 s2core/build.sbt                                |   5 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  |  21 +-
 .../org/apache/s2graph/core/S2GraphLike.scala   |  11 +-
 .../s2graph/core/index/ESIndexProvider.scala    | 206 +++++++++++++++++
 .../s2graph/core/index/IndexProvider.scala      | 206 +----------------
 .../core/index/LuceneIndexProvider.scala        | 208 +++++++++++++++++
 .../s2graph/core/mysqls/GlobalIndex.scala       |   2 +
 .../apache/s2graph/core/types/VertexId.scala    |  12 +-
 .../core/Integrate/IntegrateCommon.scala        |   8 +-
 .../s2graph/core/index/IndexProviderTest.scala  | 226 ++++++++++---------
 .../core/tinkerpop/S2GraphProvider.scala        |   1 +
 12 files changed, 577 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index b965aed..d3b8d93 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -28,6 +28,8 @@ object Common {
   val hadoopVersion = "2.7.3"
   val tinkerpopVersion = "3.2.5"
 
+  val elastic4sVersion = "6.1.0"
+
   /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging 
libraries to forward JCL and JUL logs to SLF4j */
   val loggingRuntime = Seq(
     "log4j" % "log4j" % "1.2.17",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 804c57c..110d5e5 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -50,7 +50,10 @@ libraryDependencies ++= Seq(
   "org.apache.lucene" % "lucene-core" % "6.6.0",
   "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
   "org.rocksdb" % "rocksdbjni" % "5.8.0",
-  "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
+  "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0",
+  "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion 
excludeLogging(),
+  "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion 
excludeLogging(),
+  "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion 
excludeLogging()
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 3eb1fa7..f52acf3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -377,20 +377,19 @@ case class S2Edge(override val innerGraph: S2GraphLike,
 object EdgeId {
   val EdgeIdDelimiter = ","
   def fromString(s: String): EdgeId = {
-//    val Array(src, tgt, labelName, dir, ts) = s.split(EdgeIdDelimiter)
-//    val label = Label.findByName(labelName).getOrElse(throw 
LabelNotExistException(labelName))
-//    val srcColumn = label.srcColumnWithDir(GraphUtil.toDirection(dir))
-//    val tgtColumn = label.tgtColumnWithDir(GraphUtil.toDirection(dir))
-//    EdgeId(
-//      JSONParser.toInnerVal(src, srcColumn.columnType, label.schemaVersion),
-//      JSONParser.toInnerVal(tgt, tgtColumn.columnType, label.schemaVersion),
-//      labelName,
-//      dir,
-//      ts.toLong
-//    )
     val js = Json.parse(s)
     s2EdgeIdReads.reads(Json.parse(s)).get
   }
+
+  def isValid(edgeId: EdgeId): Option[EdgeId] = {
+    VertexId.isValid(edgeId.srcVertexId).flatMap { _ =>
+      VertexId.isValid(edgeId.tgtVertexId).flatMap { _ =>
+        Label.findByName(edgeId.labelName).map { _ =>
+          edgeId
+        }
+      }
+    }
+  }
 }
 
 case class EdgeId(srcVertexId: VertexId,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index bb36a33..f639e84 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -298,8 +298,15 @@ trait S2GraphLike extends Graph {
                 belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
     val vertex = elementBuilder.newVertex(id, ts, props, op, belongLabelIds)
 
-    val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
-      if (rets.forall(_.isSuccess)) vertex
+    val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
+      if (rets.forall(_.isSuccess)) {
+        indexProvider.mutateVerticesAsync(Seq(vertex)).map { ls =>
+          if (ls.forall(identity)) vertex
+          else {
+            throw new RuntimeException("indexVertex failed.")
+          }
+        }
+      }
       else throw new RuntimeException("addVertex failed.")
     }
     Await.ready(future, WaitTimeout)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
new file mode 100644
index 0000000..6486fa9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -0,0 +1,206 @@
+package org.apache.s2graph.core.index
+
+import java.util
+
+import com.sksamuel.elastic4s.ElasticsearchClientUri
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.HttpClient
+import com.typesafe.config.Config
+import org.apache.s2graph.core.io.Conversions
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
+import play.api.libs.json.Json
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends 
IndexProvider {
+  import GlobalIndex._
+  import IndexProvider._
+
+  import scala.collection.mutable
+
+  implicit val executor = ec
+
+  val esClientUri = "localhost"
+//  //    config.getString("es.index.provider.client.uri")
+  val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200))
+//  val node = LocalNode("elasticsearch", "./es")
+//  val client = node.http(true)
+
+  val WaitTime = Duration("60 seconds")
+
+  private def toFields(globalIndex: GlobalIndex, vertex: S2VertexLike): 
Option[Map[String, Any]] = {
+    val props = vertex.props.asScala
+    val filtered = props.filterKeys(globalIndex.propNamesSet)
+
+    if (filtered.isEmpty) None
+    else {
+      val fields = mutable.Map.empty[String, Any]
+
+      fields += (vidField -> vertex.id.toString())
+      fields += (serviceField -> vertex.serviceName)
+      fields += (serviceColumnField -> vertex.columnName)
+
+      filtered.foreach { case (dim, s2VertexProperty) =>
+        s2VertexProperty.columnMeta.dataType match {
+          case "string" => fields += (dim -> 
s2VertexProperty.innerVal.value.toString)
+          case _ => fields += (dim -> s2VertexProperty.innerVal.value)
+        }
+      }
+
+      Option(fields.toMap)
+    }
+  }
+
+  private def toFields(globalIndex: GlobalIndex, edge: S2EdgeLike): 
Option[Map[String, Any]] = {
+    val props = edge.getPropsWithTs().asScala
+    val filtered = props.filterKeys(globalIndex.propNamesSet)
+
+    if (filtered.isEmpty) None
+    else {
+      val fields = mutable.Map.empty[String, Any]
+
+      fields += (eidField -> edge.edgeId.toString)
+      fields += (serviceField -> edge.serviceName)
+      fields += (labelField -> edge.label())
+
+      filtered.foreach { case (dim, s2Property) =>
+        s2Property.labelMeta.dataType match {
+          case "string" => fields += (dim -> 
s2Property.innerVal.value.toString)
+          case _ => fields += (dim -> s2Property.innerVal.value)
+        }
+      }
+
+      Option(fields.toMap)
+    }
+  }
+
+  override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): 
Future[Seq[Boolean]] = {
+    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType)
+
+    val bulkRequests = globalIndexOptions.flatMap { globalIndex =>
+      vertices.flatMap { vertex =>
+        toFields(globalIndex, vertex).toSeq.map { fields =>
+          indexInto(globalIndex.backendIndexNameWithType).fields(fields)
+        }
+      }
+    }
+    if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
+    else {
+      client.execute {
+        val requests = bulk(requests = bulkRequests)
+
+        requests
+      }.map { ret =>
+        ret match {
+          case Left(failure) => vertices.map(_ => false)
+          case Right(results) => vertices.map(_ => true)
+        }
+      }
+    }
+  }
+
+  override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] =
+    Await.result(mutateVerticesAsync(vertices), WaitTime)
+
+  override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] =
+    Await.result(mutateEdgesAsync(edges), WaitTime)
+
+  override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] 
= {
+    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
+
+    val bulkRequests = globalIndexOptions.flatMap { globalIndex =>
+      edges.flatMap { edge =>
+        toFields(globalIndex, edge).toSeq.map { fields =>
+          indexInto(globalIndex.backendIndexNameWithType).fields(fields)
+        }
+      }
+    }
+
+    if (bulkRequests.isEmpty) Future.successful(edges.map(_ => true))
+    else {
+      client.execute {
+        bulk(bulkRequests)
+      }.map { ret =>
+        ret match {
+          case Left(failure) => edges.map(_ => false)
+          case Right(results) => edges.map(_ => true)
+        }
+      }
+    }
+  }
+
+  override def fetchEdgeIds(hasContainers: util.List[HasContainer]): 
util.List[EdgeId] =
+    Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime)
+
+  override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): 
Future[util.List[EdgeId]] = {
+    val field = eidField
+    val ids = new java.util.HashSet[EdgeId]
+
+    GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers) match {
+      case None => Future.successful(new util.ArrayList[EdgeId](ids))
+      case Some(globalIndex) =>
+        val queryString = buildQueryString(hasContainers)
+
+        client.execute {
+          search(globalIndex.backendIndexName).query(queryString)
+        }.map { ret =>
+          ret match {
+            case Left(failure) =>
+            case Right(results) =>
+              results.result.hits.hits.foreach { searchHit =>
+                searchHit.sourceAsMap.get(field).foreach { idValue =>
+                  val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get
+
+                  //TODO: Come up with better way to filter out hits with 
invalid meta.
+                  EdgeId.isValid(id).foreach(ids.add)
+                }
+              }
+          }
+
+          new util.ArrayList[EdgeId](ids)
+        }
+    }
+  }
+
+
+  override def fetchVertexIds(hasContainers: util.List[HasContainer]): 
util.List[VertexId] =
+    Await.result(fetchVertexIdsAsync(hasContainers), WaitTime)
+
+  override def fetchVertexIdsAsync(hasContainers: util.List[HasContainer]): 
Future[util.List[VertexId]] = {
+    val field = vidField
+    val ids = new java.util.HashSet[VertexId]
+
+    GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers) match {
+      case None => Future.successful(new util.ArrayList[VertexId](ids))
+      case Some(globalIndex) =>
+        val queryString = buildQueryString(hasContainers)
+
+        client.execute {
+          search(globalIndex.backendIndexName).query(queryString)
+        }.map { ret =>
+          ret match {
+            case Left(failure) =>
+            case Right(results) =>
+              results.result.hits.hits.foreach { searchHit =>
+                searchHit.sourceAsMap.get(field).foreach { idValue =>
+                  val id = 
Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get
+                  //TODO: Come up with better way to filter out hits with 
invalid meta.
+                  VertexId.isValid(id).foreach(ids.add)
+                }
+              }
+          }
+
+          new util.ArrayList[VertexId](ids)
+        }
+    }
+  }
+
+  override def shutdown(): Unit = {
+    client.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index 2411e65..dcddadf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -22,35 +22,28 @@ package org.apache.s2graph.core.index
 import java.util
 
 import com.typesafe.config.Config
-import org.apache.lucene.analysis.standard.StandardAnalyzer
-import org.apache.lucene.document._
-import org.apache.lucene.index.{DirectoryReader, IndexWriter, 
IndexWriterConfig}
-import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
-import org.apache.lucene.search.IndexSearcher
-import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
-import org.apache.s2graph.core.io.Conversions
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls._
-import org.apache.s2graph.core.types.{InnerValLike, VertexId}
-import org.apache.s2graph.core.utils.logger
-import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains, P}
+import org.apache.s2graph.core.types.VertexId
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP}
+import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains}
 import org.apache.tinkerpop.gremlin.structure.T
-import play.api.libs.json.Json
 
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
 
 object IndexProvider {
   import GlobalIndex._
   val hitsPerPage = 100000
 
-  def apply(config: Config): IndexProvider = {
-    val indexProviderType = "lucene"
-//      if (config.hasPath("index.provider")) 
config.getString("index.provider") else "lucene"
+  def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = {
+
+    val indexProviderType = Try { config.getString("index.provider") 
}.getOrElse("lucene")
 
     indexProviderType match {
       case "lucene" => new LuceneIndexProvider(config)
+      case "es" => new ESIndexProvider(config)
     }
   }
 
@@ -135,186 +128,3 @@ trait IndexProvider {
 
   def shutdown(): Unit
 }
-
-class LuceneIndexProvider(config: Config) extends IndexProvider {
-  import IndexProvider._
-  import scala.collection.mutable
-  import scala.collection.JavaConverters._
-  import GlobalIndex._
-
-  val analyzer = new StandardAnalyzer()
-  val writers = mutable.Map.empty[String, IndexWriter]
-  val directories = mutable.Map.empty[String, BaseDirectory]
-
-  private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = {
-    writers.getOrElseUpdate(indexName, {
-      val dir = directories.getOrElseUpdate(indexName, new RAMDirectory())
-      val indexConfig = new IndexWriterConfig(analyzer)
-      new IndexWriter(dir, indexConfig)
-    })
-  }
-
-  private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): 
Option[Document] = {
-    val props = vertex.props.asScala
-    val exist = props.exists(t => globalIndex.propNamesSet(t._1))
-    if (!exist) None
-    else {
-      val doc = new Document()
-      val id = vertex.id.toString
-
-      doc.add(new StringField(vidField, id, Field.Store.YES))
-      doc.add(new StringField(serviceField, vertex.serviceName, 
Field.Store.YES))
-      doc.add(new StringField(serviceColumnField, vertex.columnName, 
Field.Store.YES))
-
-      props.foreach { case (dim, s2VertexProperty) =>
-        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES 
else Field.Store.NO
-        val field = s2VertexProperty.columnMeta.dataType match {
-          case "string" => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
-          case _ => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
-        }
-        doc.add(field)
-      }
-
-      Option(doc)
-    }
-  }
-
-  private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): 
Option[Document] = {
-    val props = edge.getPropsWithTs().asScala
-    val exist = props.exists(t => globalIndex.propNamesSet(t._1))
-    if (!exist) None
-    else {
-      val doc = new Document()
-      val id = edge.edgeId.toString
-
-      doc.add(new StringField(eidField, id, Field.Store.YES))
-      doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES))
-      doc.add(new StringField(labelField, edge.label(), Field.Store.YES))
-
-      props.foreach { case (dim, s2Property) =>
-        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES 
else Field.Store.NO
-        val field = s2Property.labelMeta.dataType match {
-          case "string" => new StringField(dim, 
s2Property.innerVal.value.toString, shouldIndex)
-          case _ => new StringField(dim, s2Property.innerVal.value.toString, 
shouldIndex)
-        }
-        doc.add(field)
-      }
-
-      Option(doc)
-    }
-  }
-
-  override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = {
-    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType)
-
-    globalIndexOptions.map { globalIndex =>
-      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
-
-      vertices.foreach { vertex =>
-        toDocument(globalIndex, vertex).foreach { doc =>
-          writer.addDocument(doc)
-        }
-      }
-
-      writer.commit()
-    }
-
-    vertices.map(_ => true)
-  }
-
-  override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = {
-    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
-
-    globalIndexOptions.map { globalIndex =>
-      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
-
-      edges.foreach { edge =>
-        toDocument(globalIndex, edge).foreach { doc =>
-          writer.addDocument(doc)
-        }
-      }
-
-      writer.commit()
-    }
-
-    edges.map(_ => true)
-  }
-
-  override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): 
java.util.List[EdgeId] = {
-    val field = eidField
-    val ids = new java.util.HashSet[EdgeId]
-
-    GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { 
globalIndex =>
-      val queryString = buildQueryString(hasContainers)
-
-      try {
-        val q = new QueryParser(field, analyzer).parse(queryString)
-
-        val reader = DirectoryReader.open(directories(globalIndex.indexName))
-        val searcher = new IndexSearcher(reader)
-
-        val docs = searcher.search(q, hitsPerPage)
-
-        docs.scoreDocs.foreach { scoreDoc =>
-          val document = searcher.doc(scoreDoc.doc)
-          val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
-          ids.add(id);
-        }
-
-        reader.close()
-        ids
-      } catch {
-        case ex: ParseException =>
-          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
-          ids
-      }
-    }
-
-    new util.ArrayList[EdgeId](ids)
-  }
-
-  override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): 
java.util.List[VertexId] = {
-    val field = vidField
-    val ids = new java.util.HashSet[VertexId]
-
-    GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { 
globalIndex =>
-      val queryString = buildQueryString(hasContainers)
-
-      try {
-        val q = new QueryParser(field, analyzer).parse(queryString)
-
-        val reader = DirectoryReader.open(directories(globalIndex.indexName))
-        val searcher = new IndexSearcher(reader)
-
-        val docs = searcher.search(q, hitsPerPage)
-
-        docs.scoreDocs.foreach { scoreDoc =>
-          val document = searcher.doc(scoreDoc.doc)
-          val id = 
Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
-          ids.add(id)
-        }
-
-        reader.close()
-        ids
-      } catch {
-        case ex: ParseException =>
-          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
-          ids
-      }
-    }
-
-    new util.ArrayList[VertexId](ids)
-  }
-
-  override def shutdown(): Unit = {
-    writers.foreach { case (_, writer) => writer.close() }
-  }
-
-  override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): 
Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
-
-  override def fetchVertexIdsAsync(hasContainers: 
java.util.List[HasContainer]): Future[util.List[VertexId]] = 
Future.successful(fetchVertexIds(hasContainers))
-
-  override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): 
Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
-
-  override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] 
= Future.successful(mutateEdges(edges))
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
new file mode 100644
index 0000000..0670d48
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -0,0 +1,208 @@
+package org.apache.s2graph.core.index
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.lucene.analysis.standard.StandardAnalyzer
+import org.apache.lucene.document.{Document, Field, StringField}
+import org.apache.lucene.index.{DirectoryReader, IndexWriter, 
IndexWriterConfig}
+import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
+import org.apache.lucene.search.IndexSearcher
+import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.io.Conversions
+import org.apache.s2graph.core.mysqls.GlobalIndex
+import org.apache.s2graph.core.mysqls.GlobalIndex.{eidField, labelField, 
serviceColumnField, serviceField, vidField}
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core.utils.logger
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
+import play.api.libs.json.Json
+
+import scala.concurrent.Future
+
+
+class LuceneIndexProvider(config: Config) extends IndexProvider {
+  import IndexProvider._
+  import scala.collection.mutable
+  import scala.collection.JavaConverters._
+  import GlobalIndex._
+
+  val analyzer = new StandardAnalyzer()
+  val writers = mutable.Map.empty[String, IndexWriter]
+  val directories = mutable.Map.empty[String, BaseDirectory]
+
+  private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = {
+    writers.getOrElseUpdate(indexName, {
+      val dir = directories.getOrElseUpdate(indexName, new RAMDirectory())
+      val indexConfig = new IndexWriterConfig(analyzer)
+      new IndexWriter(dir, indexConfig)
+    })
+  }
+
+  private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): 
Option[Document] = {
+    val props = vertex.props.asScala
+    val filtered = props.filterKeys(globalIndex.propNamesSet)
+
+    if (filtered.isEmpty) None
+    else {
+      val doc = new Document()
+      val id = vertex.id.toString
+
+      doc.add(new StringField(vidField, id, Field.Store.YES))
+      doc.add(new StringField(serviceField, vertex.serviceName, 
Field.Store.YES))
+      doc.add(new StringField(serviceColumnField, vertex.columnName, 
Field.Store.YES))
+
+      filtered.foreach { case (dim, s2VertexProperty) =>
+        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES 
else Field.Store.NO
+        val field = s2VertexProperty.columnMeta.dataType match {
+          case "string" => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
+          case _ => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
+        }
+        doc.add(field)
+      }
+
+      Option(doc)
+    }
+  }
+
+  private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): 
Option[Document] = {
+    val props = edge.getPropsWithTs().asScala
+    val filtered = props.filterKeys(globalIndex.propNamesSet)
+
+    if (filtered.isEmpty) None
+    else {
+      val doc = new Document()
+      val id = edge.edgeId.toString
+
+      doc.add(new StringField(eidField, id, Field.Store.YES))
+      doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES))
+      doc.add(new StringField(labelField, edge.label(), Field.Store.YES))
+
+      filtered.foreach { case (dim, s2Property) =>
+        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES 
else Field.Store.NO
+        val field = s2Property.labelMeta.dataType match {
+          case "string" => new StringField(dim, 
s2Property.innerVal.value.toString, shouldIndex)
+          case _ => new StringField(dim, s2Property.innerVal.value.toString, 
shouldIndex)
+        }
+        doc.add(field)
+      }
+
+      Option(doc)
+    }
+  }
+
+  override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = {
+    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType)
+
+    globalIndexOptions.map { globalIndex =>
+      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
+
+      vertices.foreach { vertex =>
+        toDocument(globalIndex, vertex).foreach { doc =>
+          writer.addDocument(doc)
+        }
+      }
+
+      writer.commit()
+    }
+
+    vertices.map(_ => true)
+  }
+
+  override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = {
+    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
+
+    globalIndexOptions.map { globalIndex =>
+      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
+
+      edges.foreach { edge =>
+        toDocument(globalIndex, edge).foreach { doc =>
+          writer.addDocument(doc)
+        }
+      }
+
+      writer.commit()
+    }
+
+    edges.map(_ => true)
+  }
+
+  override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): 
java.util.List[EdgeId] = {
+    val field = eidField
+    val ids = new java.util.HashSet[EdgeId]
+
+    GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { 
globalIndex =>
+      val queryString = buildQueryString(hasContainers)
+
+      try {
+        val q = new QueryParser(field, analyzer).parse(queryString)
+
+        val reader = DirectoryReader.open(directories(globalIndex.indexName))
+        val searcher = new IndexSearcher(reader)
+
+        val docs = searcher.search(q, hitsPerPage)
+
+        docs.scoreDocs.foreach { scoreDoc =>
+          val document = searcher.doc(scoreDoc.doc)
+          val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
+          ids.add(id);
+        }
+
+        reader.close()
+        ids
+      } catch {
+        case ex: ParseException =>
+          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+          ids
+      }
+    }
+
+    new util.ArrayList[EdgeId](ids)
+  }
+
+  override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): 
java.util.List[VertexId] = {
+    val field = vidField
+    val ids = new java.util.HashSet[VertexId]
+
+    GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { 
globalIndex =>
+      val queryString = buildQueryString(hasContainers)
+
+      try {
+        val q = new QueryParser(field, analyzer).parse(queryString)
+
+        val reader = DirectoryReader.open(directories(globalIndex.indexName))
+        val searcher = new IndexSearcher(reader)
+
+        val docs = searcher.search(q, hitsPerPage)
+
+        docs.scoreDocs.foreach { scoreDoc =>
+          val document = searcher.doc(scoreDoc.doc)
+          val id = 
Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
+          ids.add(id)
+        }
+
+        reader.close()
+        ids
+      } catch {
+        case ex: ParseException =>
+          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+          ids
+      }
+    }
+
+    new util.ArrayList[VertexId](ids)
+  }
+
+  override def shutdown(): Unit = {
+    writers.foreach { case (_, writer) => writer.close() }
+  }
+
+  override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): 
Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
+
+  override def fetchVertexIdsAsync(hasContainers: 
java.util.List[HasContainer]): Future[util.List[VertexId]] = 
Future.successful(fetchVertexIds(hasContainers))
+
+  override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): 
Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
+
+  override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] 
= Future.successful(mutateEdges(edges))
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
index a918db5..d756ddf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
@@ -91,5 +91,7 @@ case class GlobalIndex(id: Option[Int],
                        elementType: String,
                        propNames: Seq[String],
                        indexName: String)  {
+  val backendIndexName = indexName + "_" + elementType
+  val backendIndexNameWithType = backendIndexName + "/test1"
   lazy val propNamesSet = propNames.toSet
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
index eb00405..b1de0c4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -59,14 +59,14 @@ object VertexId extends HBaseDeserializable {
   }
 
   def fromString(s: String): VertexId = {
-
-//    val Array(serviceId, columnName, innerValStr) = 
s.split(S2Vertex.VertexLabelDelimiter)
-//    val service = Service.findById(serviceId.toInt)
-//    val column = ServiceColumn.find(service.id.get, 
columnName).getOrElse(throw new LabelNotExistException(columnName))
-//    val innerId = JSONParser.toInnerVal(innerValStr, column.columnType, 
column.schemaVersion)
-//    VertexId(column, innerId)
     s2VertexIdReads.reads(Json.parse(s)).get
   }
+
+  def isValid(vertexId: VertexId): Option[VertexId] = {
+    ServiceColumn.find(vertexId.column.serviceId, 
vertexId.column.columnName).map { column =>
+      vertexId
+    }
+  }
 }
 
 class VertexId (val column: ServiceColumn, val innerId: InnerValLike) extends 
HBaseSerializable with Comparable[VertexId] {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index c720b9f..08f075d 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.Integrate
 
 import com.typesafe.config._
-import org.apache.s2graph.core.mysqls.{Label, Model}
+import org.apache.s2graph.core.mysqls.{GlobalIndex, Label, Model}
 import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core._
@@ -98,6 +98,12 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
       Management.addVertexProp(testServiceName, testColumnName, key, keyType)
     }
 
+    // vertex type global index.
+    val globalVertexIndex = 
management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", 
Seq("age"))
+
+    // edge type global index.
+    val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, 
"test_weight_time_edge", Seq("weight", "time"))
+
     logger.info("[init end]: 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index 8294a8f..3871399 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.index
 
 import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.{Query, QueryParam, S2Vertex, Step}
+import org.apache.s2graph.core._
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.apache.tinkerpop.gremlin.process.traversal.{Order, P}
 import org.apache.s2graph.core.mysqls._
@@ -31,120 +31,122 @@ import org.apache.tinkerpop.gremlin.structure.T
 import scala.collection.JavaConversions._
 
 class IndexProviderTest extends IntegrateCommon {
+  import scala.concurrent.ExecutionContext.Implicits.global
   val indexProvider = IndexProvider(config)
   val numOfTry = 1
 
-  lazy val gIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test1", 
Seq("_timestamp", "weight", "time"))
-
-//  test("test vertex write/query") {
-//    gIndex
-//    import TestUtil._
-////    Management.addVertexProp(testServiceName, testColumnName, "time", 
"long")
-//
-//    val testService = Service.findByName(TestUtil.testServiceName).get
-//    val testColumn = ServiceColumn.find(testService.id.get, 
TestUtil.testColumnName).get
-//    val vertexId = graph.newVertexId(testServiceName)(testColumnName)(1L)
-//
-//    val propsWithTs = Map(
-////      testColumn.metasInvMap("time") -> InnerVal.withLong(1L, "v4")
-//      ColumnMeta.timestamp -> InnerVal.withLong(1L, "v4")
-//    )
-//    val otherPropsWithTs = Map(
-////      testColumn.metasInvMap("time") -> InnerVal.withLong(2L, "v4")
-//      ColumnMeta.timestamp -> InnerVal.withLong(2L, "v4")
-//    )
-//    val vertex = graph.newVertex(vertexId)
-//    S2Vertex.fillPropsWithTs(vertex, propsWithTs)
-//
-//    val otherVertex = graph.newVertex(vertexId)
-//    S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs)
-//
-//    val numOfOthers = 10
-//    val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex)
-//
-//    println(s"[# of vertices]: ${vertices.size}")
-//    vertices.foreach(v => println(s"[Vertex]: $v"))
-//    indexProvider.mutateVertices(vertices)
-//
-//    (0 until numOfTry).foreach { ith =>
-//      val hasContainer = new HasContainer("_timestamp", P.eq(Long.box(1)))
-//
-//      var ids = indexProvider.fetchVertexIds(Seq(hasContainer))
-//      ids.head shouldBe vertex.id
-//
-//      ids.foreach { id =>
-//        println(s"[Id]: $id")
-//      }
-//    }
-//  }
-//
-//  test("test edge write/query ") {
-//    import TestUtil._
-//    val testLabelName = TestUtil.testLabelName
-//    val testLabel = Label.findByName(testLabelName).getOrElse(throw new 
IllegalArgumentException)
-//    val vertexId = graph.newVertexId(testServiceName)(testColumnName)(1L)
-//    val otherVertexId = 
graph.newVertexId(testServiceName)(testColumnName)(2L)
-//    val vertex = graph.newVertex(vertexId)
-//    val otherVertex = graph.newVertex(otherVertexId)
-//
-//    val propsWithTs = Map(
-//      LabelMeta.timestamp -> InnerValLikeWithTs.withLong(1L, 1L, "v4"),
-//      testLabel.metaPropsInvMap("time") -> InnerValLikeWithTs.withLong(10L, 
1L, "v4")
-//    )
-//    val otherPropsWithTs = Map(
-//      LabelMeta.timestamp -> InnerValLikeWithTs.withLong(2L, 2L, "v4"),
-//      testLabel.metaPropsInvMap("time") -> InnerValLikeWithTs.withLong(20L, 
2L, "v4")
-//    )
-//    val edge = graph.newEdge(vertex, vertex, testLabel, 0, propsWithTs = 
propsWithTs)
-//    val otherEdge = graph.newEdge(otherVertex, otherVertex, testLabel, 0, 
propsWithTs = otherPropsWithTs)
-//    val numOfOthers = 10
-//    val edges = Seq(edge) ++ (0 until numOfOthers).map(_ => otherEdge)
-//
-//    println(s"[# of edges]: ${edges.size}")
-//    edges.foreach(e => println(s"[Edge]: $e"))
-//    indexProvider.mutateEdges(edges)
-//
-//    // match
-//    (0 until numOfTry).foreach { _ =>
-//      val hasContainers = Seq(new HasContainer("time", P.eq(Int.box(10))),
-//        new HasContainer("_timestamp", P.eq(Int.box(1))))
-//      val ids = indexProvider.fetchEdgeIds(hasContainers)
-//      ids.head shouldBe edge.edgeId
-//
-//      ids.foreach { id =>
-//        println(s"[Id]: $id")
-//      }
-//    }
-//
-//    // match and not
-//    (0 until numOfTry).foreach { _ =>
-//      val hasContainers = Seq(new HasContainer("time", P.eq(Int.box(20))),
-//        new HasContainer("_timestamp", P.neq(Int.box(1))))
-//      val ids = indexProvider.fetchEdgeIds(hasContainers)
-//      //    ids.size shouldBe 0
-//      // distinct make ids size to 1
-////      ids.size shouldBe numOfOthers
-//
-//      ids.foreach { id =>
-//        id shouldBe otherEdge.edgeId
-//        println(s"[Id]: $id")
-//      }
-//    }
-//
-//    // range
-//    (0 until numOfTry).foreach { _ =>
-//      val hasContainers = Seq(new HasContainer("time",
-//        P.inside(Int.box(0), Int.box(11))))
-//      val ids = indexProvider.fetchEdgeIds(hasContainers)
-//      //    ids.size shouldBe 0
-//      ids.size shouldBe 1
-//
-//      ids.foreach { id =>
-//        id shouldBe edge.edgeId
-//        println(s"[Id]: $id")
-//      }
-//    }
-//  }
+  test("test vertex write/query") {
+    import TestUtil._
+
+    val testService = Service.findByName(TestUtil.testServiceName).get
+    val testColumn = ServiceColumn.find(testService.id.get, 
TestUtil.testColumnName).get
+    val vertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L)
+    val indexPropsColumnMeta = testColumn.metasInvMap("age")
+
+
+    val propsWithTs = Map(
+      indexPropsColumnMeta -> InnerVal.withInt(1, "v4")
+    )
+    val otherPropsWithTs = Map(
+      indexPropsColumnMeta -> InnerVal.withInt(2, "v4")
+    )
+    val vertex = graph.elementBuilder.newVertex(vertexId)
+    S2Vertex.fillPropsWithTs(vertex, propsWithTs)
+
+    val otherVertex = graph.elementBuilder.newVertex(vertexId)
+    S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs)
+
+    val numOfOthers = 10
+    val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex)
+
+    println(s"[# of vertices]: ${vertices.size}")
+    vertices.foreach(v => println(s"[Vertex]: $v, ${v.props}"))
+    indexProvider.mutateVertices(vertices)
+
+    (0 until numOfTry).foreach { ith =>
+      val hasContainer = new HasContainer(indexPropsColumnMeta.name, 
P.eq(Long.box(1)))
+
+      var ids = indexProvider.fetchVertexIds(Seq(hasContainer))
+      ids.head shouldBe vertex.id
+
+      ids.foreach { id =>
+        println(s"[Id]: $id")
+      }
+    }
+  }
+
+  test("test edge write/query ") {
+    import TestUtil._
+    val testLabelName = TestUtil.testLabelName
+    val testLabel = Label.findByName(testLabelName).getOrElse(throw new 
IllegalArgumentException)
+
+    val vertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L)
+    val otherVertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L)
+    val vertex = graph.elementBuilder.newVertex(vertexId)
+    val otherVertex = graph.elementBuilder.newVertex(otherVertexId)
+    val weightMeta = testLabel.metaPropsInvMap("weight")
+    val timeMeta = testLabel.metaPropsInvMap("time")
+
+    val propsWithTs = Map(
+      weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"),
+      timeMeta -> InnerValLikeWithTs.withLong(10L, 1L, "v4")
+    )
+    val otherPropsWithTs = Map(
+      weightMeta -> InnerValLikeWithTs.withLong(2L, 2L, "v4"),
+      timeMeta -> InnerValLikeWithTs.withLong(20L, 2L, "v4")
+    )
+
+    val edge = graph.elementBuilder.newEdge(vertex, vertex, testLabel, 0, 
propsWithTs = propsWithTs)
+    val otherEdge = graph.elementBuilder.newEdge(otherVertex, otherVertex, 
testLabel, 0, propsWithTs = otherPropsWithTs)
+    val numOfOthers = 10
+    val edges = Seq(edge) ++ (0 until numOfOthers).map(_ => otherEdge)
+
+    println(s"[# of edges]: ${edges.size}")
+    edges.foreach(e => println(s"[Edge]: $e"))
+    indexProvider.mutateEdges(edges)
+
+    // match
+    (0 until numOfTry).foreach { _ =>
+      val hasContainers = Seq(new HasContainer(timeMeta.name, 
P.eq(Int.box(10))),
+        new HasContainer(weightMeta.name, P.eq(Int.box(1))))
+
+      val ids = indexProvider.fetchEdgeIds(hasContainers)
+      ids.head shouldBe edge.edgeId
+
+      ids.foreach { id =>
+        println(s"[Id]: $id")
+      }
+    }
+
+    // match and not
+    (0 until numOfTry).foreach { _ =>
+      val hasContainers = Seq(new HasContainer(timeMeta.name, 
P.eq(Int.box(20))),
+        new HasContainer(weightMeta.name, P.neq(Int.box(1))))
+      val ids = indexProvider.fetchEdgeIds(hasContainers)
+      //    ids.size shouldBe 0
+      // distinct make ids size to 1
+//      ids.size shouldBe numOfOthers
+
+      ids.foreach { id =>
+        id shouldBe otherEdge.edgeId
+        println(s"[Id]: $id")
+      }
+    }
+
+    // range
+    (0 until numOfTry).foreach { _ =>
+      val hasContainers = Seq(new HasContainer(timeMeta.name,
+        P.inside(Int.box(0), Int.box(11))))
+      val ids = indexProvider.fetchEdgeIds(hasContainers)
+      //    ids.size shouldBe 0
+      ids.size shouldBe 1
+
+      ids.foreach { id =>
+        id shouldBe edge.edgeId
+        println(s"[Id]: $id")
+      }
+    }
+  }
 
   test("buildQuerySingleString") {
     // (weight: 34) AND (weight: [0.5 TO *] AND price: 30)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/939e5f3b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index 3a73d20..bf3291e 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -96,6 +96,7 @@ class S2GraphProvider extends AbstractGraphProvider {
     val defaultServiceColumn = ServiceColumn.find(defaultService.id.get, 
S2Graph.DefaultColumnName).getOrElse(throw new IllegalStateException("default 
column is not initialized."))
 
     val allProps = scala.collection.mutable.Set.empty[Prop]
+
     var knowsProp = Vector(
       Prop("weight", "0.0", "double"),
       Prop("data", "-", "string"),


Reply via email to