move global index setting under ColumnMeta, LabelMeta.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/63012f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/63012f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/63012f8e

Branch: refs/heads/master
Commit: 63012f8e51b65626b3b62fa20cded2b6c809da4e
Parents: 045392f
Author: DO YUNG YOON <[email protected]>
Authored: Fri Feb 9 21:57:23 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Sat Feb 10 09:03:31 2018 +0900

----------------------------------------------------------------------
 dev_support/graph_mysql/schema.sql              |   2 +
 .../org/apache/s2graph/core/mysqls/schema.sql   |   2 +
 .../org/apache/s2graph/core/Management.scala    |  79 ++++++----
 .../apache/s2graph/core/S2GraphFactory.scala    |  48 +++---
 .../s2graph/core/index/ESIndexProvider.scala    | 140 ++++++++---------
 .../s2graph/core/index/IndexProvider.scala      |   8 +-
 .../core/index/LuceneIndexProvider.scala        | 152 +++++++++----------
 .../apache/s2graph/core/io/Conversions.scala    |   6 +-
 .../apache/s2graph/core/mysqls/ColumnMeta.scala |  32 +++-
 .../org/apache/s2graph/core/mysqls/Label.scala  |   4 +-
 .../apache/s2graph/core/mysqls/LabelMeta.scala  |  25 ++-
 .../apache/s2graph/core/mysqls/Service.scala    |   2 +
 .../core/Integrate/IntegrateCommon.scala        |   6 +-
 .../s2graph/core/index/IndexProviderTest.scala  |  11 +-
 .../s2graph/core/models/GlobalIndexTest.scala   |  81 ----------
 .../core/tinkerpop/S2GraphProvider.scala        |   5 +-
 .../core/tinkerpop/structure/S2GraphTest.scala  |   2 -
 17 files changed, 281 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/dev_support/graph_mysql/schema.sql
----------------------------------------------------------------------
diff --git a/dev_support/graph_mysql/schema.sql 
b/dev_support/graph_mysql/schema.sql
index ea72c63..48b27df 100644
--- a/dev_support/graph_mysql/schema.sql
+++ b/dev_support/graph_mysql/schema.sql
@@ -76,6 +76,7 @@ CREATE TABLE `column_metas` (
        `name` varchar(64) NOT NULL,
        `seq` tinyint   NOT NULL,
        `data_type` varchar(8) NOT NULL DEFAULT 'string',
+       `store_in_global_index` tinyint NOT NULL DEFAULT 0,
        PRIMARY KEY (`id`),
        UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
        INDEX `idx_column_id_seq` (`column_id`, `seq`)
@@ -147,6 +148,7 @@ CREATE TABLE `label_metas` (
        `default_value` varchar(64) NOT NULL,
        `data_type` varchar(8) NOT NULL DEFAULT 'long',
        `used_in_index` tinyint NOT NULL DEFAULT 0,
+       `store_in_global_index` tinyint NOT NULL DEFAULT 0,
        PRIMARY KEY (`id`),
        UNIQUE KEY `ux_label_id_name` (`label_id`, `name`),
        INDEX `idx_label_id_seq` (`label_id`, `seq`)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql 
b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
index f5f6a61..b66b46b 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
@@ -65,6 +65,7 @@ CREATE TABLE `column_metas` (
   `name` varchar(64) NOT NULL,
   `seq` tinyint        NOT NULL,
   `data_type` varchar(8) NOT NULL DEFAULT 'string',
+  `store_in_global_index` tinyint NOT NULL DEFAULT 0,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_column_id_name` (`column_id`, `name`),
   INDEX `idx_column_id_seq` (`column_id`, `seq`)
@@ -136,6 +137,7 @@ CREATE TABLE `label_metas` (
   `default_value` varchar(64) NOT NULL,
   `data_type` varchar(8) NOT NULL DEFAULT 'long',
   `used_in_index` tinyint      NOT NULL DEFAULT 0,
+  `store_in_global_index` tinyint NOT NULL DEFAULT 0,
   PRIMARY KEY (`id`),
   UNIQUE KEY `ux_label_metas_label_id_name` (`label_id`, `name`),
   INDEX `idx_label_metas_label_id_seq` (`label_id`, `seq`)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 8d2d62a..3af8b79 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -63,9 +63,9 @@ object Management {
 
   object JsonModel {
 
-    case class Prop(name: String, defaultValue: String, datatType: String)
+    case class Prop(name: String, defaultValue: String, datatType: String, 
storeInGlobalIndex: Boolean = false)
 
-    object Prop extends ((String, String, String) => Prop)
+    object Prop extends ((String, String, String, Boolean) => Prop)
 
     case class Index(name: String, propNames: Seq[String], direction: 
Option[Int] = None, options: Option[String] = None)
   }
@@ -102,9 +102,10 @@ object Management {
         case Some(service) =>
           val serviceColumn = ServiceColumn.findOrInsert(service.id.get, 
columnName, Some(columnType), schemaVersion, useCache = false)
           for {
-            Prop(propName, defaultValue, dataType) <- props
+            Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
           } yield {
-            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, 
useCache = false)
+            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType,
+              storeInGlobalIndex = storeInGlobalIndex, useCache = false)
           }
       }
     }
@@ -165,7 +166,7 @@ object Management {
       val labelOpt = Label.findByName(labelStr)
       val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr 
not found"))
 
-      LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, 
prop.datatType)
+      LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, 
prop.datatType, prop.storeInGlobalIndex)
     }
   }
 
@@ -175,8 +176,8 @@ object Management {
       val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr 
not found"))
 
       props.map {
-        case Prop(propName, defaultValue, dataType) =>
-          LabelMeta.findOrInsert(label.id.get, propName, defaultValue, 
dataType)
+        case Prop(propName, defaultValue, dataType, storeInGlobalIndex) =>
+          LabelMeta.findOrInsert(label.id.get, propName, defaultValue, 
dataType, storeInGlobalIndex)
       }
     }
   }
@@ -185,12 +186,13 @@ object Management {
                     columnName: String,
                     propsName: String,
                     propsType: String,
+                    storeInGlobalIndex: Boolean = false,
                     schemaVersion: String = DEFAULT_VERSION): ColumnMeta = {
     val result = for {
       service <- Service.findByName(serviceName, useCache = false)
       serviceColumn <- ServiceColumn.find(service.id.get, columnName)
     } yield {
-        ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType)
+        ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, 
storeInGlobalIndex)
       }
     result.getOrElse({
       throw new RuntimeException(s"add property on vertex failed")
@@ -361,9 +363,10 @@ class Management(graph: S2GraphLike) {
         case Some(service) =>
           val serviceColumn = ServiceColumn.findOrInsert(service.id.get, 
columnName, Some(columnType), schemaVersion, useCache = false)
           for {
-            Prop(propName, defaultValue, dataType) <- props
+            Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
           } yield {
-            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, 
useCache = false)
+            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType,
+              storeInGlobalIndex = storeInGlobalIndex, useCache = false)
           }
           serviceColumn
       }
@@ -466,26 +469,42 @@ class Management(graph: S2GraphLike) {
       old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, 
old.isAsync, old.compressionAlgorithm, old.options)
   }
 
-  def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): 
GlobalIndex =
-    buildGlobalIndex(GlobalIndex.VertexType, name, propNames)
-
-  def buildGlobalVertexIndex(name: String, propNames: Seq[String]): 
GlobalIndex =
-    buildGlobalIndex(GlobalIndex.VertexType, name, propNames)
-
-  def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): 
GlobalIndex =
-    buildGlobalIndex(GlobalIndex.EdgeType, name, propNames)
-
-  def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex =
-    buildGlobalIndex(GlobalIndex.EdgeType, name, propNames)
-
-  def buildGlobalIndex(elementType: String, name: String, propNames: 
Seq[String]): GlobalIndex = {
-    GlobalIndex.findBy(elementType, name, false) match {
-      case None =>
-        GlobalIndex.insert(elementType, name, propNames)
-        GlobalIndex.findBy(elementType, name, false).get
-      case Some(oldIndex) => oldIndex
-    }
-  }
+  def enableVertexGlobalIndex(columnMeats: Seq[ColumnMeta]): Boolean = {
+    val successes = columnMeats.map { cm =>
+      ColumnMeta.updateStoreInGlobalIndex(cm.id.get, cm.storeInGlobalIndex)
+    }.map(_.isSuccess)
+
+    successes.forall(identity)
+  }
+
+  def enableEdgeGlobalIndex(labelMetas: Seq[LabelMeta]): Boolean = {
+    val successes = labelMetas.map { lm =>
+      LabelMeta.updateStoreInGlobalIndex(lm.id.get, lm.storeInGlobalIndex)
+    }.map(_.isSuccess)
+
+    successes.forall(identity)
+  }
+
+//  def buildGlobalVertexIndex(name: String, propNames: 
java.util.List[String]): GlobalIndex =
+//    buildGlobalIndex(GlobalIndex.VertexType, name, propNames)
+//
+//  def buildGlobalVertexIndex(name: String, propNames: Seq[String]): 
GlobalIndex =
+//    buildGlobalIndex(GlobalIndex.VertexType, name, propNames)
+//
+//  def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): 
GlobalIndex =
+//    buildGlobalIndex(GlobalIndex.EdgeType, name, propNames)
+//
+//  def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): 
GlobalIndex =
+//    buildGlobalIndex(GlobalIndex.EdgeType, name, propNames)
+//
+//  def buildGlobalIndex(elementType: String, name: String, propNames: 
Seq[String]): GlobalIndex = {
+//    GlobalIndex.findBy(elementType, name, false) match {
+//      case None =>
+//        GlobalIndex.insert(elementType, name, propNames)
+//        GlobalIndex.findBy(elementType, name, false).get
+//      case Some(oldIndex) => oldIndex
+//    }
+//  }
 
   def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for 
{
     label <- Try(Label.findByName(labelName, useCache = false).get)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index a9c98c2..07a9be1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -69,30 +69,30 @@ object S2GraphFactory {
     val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, 
DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false)
 
     val DefaultColumnMetas = {
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", 
"integer", useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", useCache = 
false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", useCache = 
false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", useCache = 
false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", 
useCache = false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", useCache 
= false)
-      ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", useCache 
= false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", 
"integer", storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", 
storeInGlobalIndex = true, useCache = false)
+      ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", 
storeInGlobalIndex = true, useCache = false)
     }
 
     //    Management.deleteLabel("_s2graph")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index 6486fa9..cb74cf1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -33,11 +33,11 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
 
   val WaitTime = Duration("60 seconds")
 
-  private def toFields(globalIndex: GlobalIndex, vertex: S2VertexLike): 
Option[Map[String, Any]] = {
+  private def toFields(vertex: S2VertexLike, forceToIndex: Boolean): 
Option[Map[String, Any]] = {
     val props = vertex.props.asScala
-    val filtered = props.filterKeys(globalIndex.propNamesSet)
+    val storeInGlobalIndex = if (forceToIndex) true else 
props.exists(_._2.columnMeta.storeInGlobalIndex)
 
-    if (filtered.isEmpty) None
+    if (!storeInGlobalIndex) None
     else {
       val fields = mutable.Map.empty[String, Any]
 
@@ -45,10 +45,13 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
       fields += (serviceField -> vertex.serviceName)
       fields += (serviceColumnField -> vertex.columnName)
 
-      filtered.foreach { case (dim, s2VertexProperty) =>
-        s2VertexProperty.columnMeta.dataType match {
-          case "string" => fields += (dim -> 
s2VertexProperty.innerVal.value.toString)
-          case _ => fields += (dim -> s2VertexProperty.innerVal.value)
+      props.foreach { case (dim, s2VertexProperty) =>
+        // skip reserved fields.
+        if (s2VertexProperty.columnMeta.seq > 0) {
+          s2VertexProperty.columnMeta.dataType match {
+            case "string" => fields += (dim -> 
s2VertexProperty.innerVal.value.toString)
+            case _ => fields += (dim -> s2VertexProperty.innerVal.value)
+          }
         }
       }
 
@@ -56,11 +59,11 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     }
   }
 
-  private def toFields(globalIndex: GlobalIndex, edge: S2EdgeLike): 
Option[Map[String, Any]] = {
+  private def toFields(edge: S2EdgeLike, forceToIndex: Boolean): 
Option[Map[String, Any]] = {
     val props = edge.getPropsWithTs().asScala
-    val filtered = props.filterKeys(globalIndex.propNamesSet)
+    val store = if (forceToIndex) true else 
props.exists(_._2.labelMeta.storeInGlobalIndex)
 
-    if (filtered.isEmpty) None
+    if (!store) None
     else {
       val fields = mutable.Map.empty[String, Any]
 
@@ -68,10 +71,12 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
       fields += (serviceField -> edge.serviceName)
       fields += (labelField -> edge.label())
 
-      filtered.foreach { case (dim, s2Property) =>
-        s2Property.labelMeta.dataType match {
-          case "string" => fields += (dim -> 
s2Property.innerVal.value.toString)
-          case _ => fields += (dim -> s2Property.innerVal.value)
+      props.foreach { case (dim, s2Property) =>
+        if (s2Property.labelMeta.seq > 0) {
+          s2Property.labelMeta.dataType match {
+            case "string" => fields += (dim -> 
s2Property.innerVal.value.toString)
+            case _ => fields += (dim -> s2Property.innerVal.value)
+          }
         }
       }
 
@@ -79,16 +84,13 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     }
   }
 
-  override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): 
Future[Seq[Boolean]] = {
-    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType)
-
-    val bulkRequests = globalIndexOptions.flatMap { globalIndex =>
-      vertices.flatMap { vertex =>
-        toFields(globalIndex, vertex).toSeq.map { fields =>
-          indexInto(globalIndex.backendIndexNameWithType).fields(fields)
+  override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Future[Seq[Boolean]] = {
+    val bulkRequests = vertices.flatMap { vertex =>
+        toFields(vertex, forceToIndex).toSeq.map { fields =>
+          indexInto(GlobalIndex.TableName).fields(fields)
         }
       }
-    }
+
     if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
     else {
       client.execute {
@@ -104,20 +106,16 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     }
   }
 
-  override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] =
-    Await.result(mutateVerticesAsync(vertices), WaitTime)
-
-  override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] =
-    Await.result(mutateEdgesAsync(edges), WaitTime)
+  override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Seq[Boolean] =
+    Await.result(mutateVerticesAsync(vertices, forceToIndex), WaitTime)
 
-  override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] 
= {
-    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
+  override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = 
false): Seq[Boolean] =
+    Await.result(mutateEdgesAsync(edges, forceToIndex), WaitTime)
 
-    val bulkRequests = globalIndexOptions.flatMap { globalIndex =>
-      edges.flatMap { edge =>
-        toFields(globalIndex, edge).toSeq.map { fields =>
-          indexInto(globalIndex.backendIndexNameWithType).fields(fields)
-        }
+  override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean 
= false): Future[Seq[Boolean]] = {
+    val bulkRequests = edges.flatMap { edge =>
+      toFields(edge, forceToIndex).toSeq.map { fields =>
+        indexInto(GlobalIndex.TableName).fields(fields)
       }
     }
 
@@ -141,29 +139,25 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     val field = eidField
     val ids = new java.util.HashSet[EdgeId]
 
-    GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers) match {
-      case None => Future.successful(new util.ArrayList[EdgeId](ids))
-      case Some(globalIndex) =>
-        val queryString = buildQueryString(hasContainers)
-
-        client.execute {
-          search(globalIndex.backendIndexName).query(queryString)
-        }.map { ret =>
-          ret match {
-            case Left(failure) =>
-            case Right(results) =>
-              results.result.hits.hits.foreach { searchHit =>
-                searchHit.sourceAsMap.get(field).foreach { idValue =>
-                  val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get
-
-                  //TODO: Come up with better way to filter out hits with 
invalid meta.
-                  EdgeId.isValid(id).foreach(ids.add)
-                }
-              }
+    val queryString = buildQueryString(hasContainers)
+
+    client.execute {
+      search(GlobalIndex.TableName).query(queryString)
+    }.map { ret =>
+      ret match {
+        case Left(failure) =>
+        case Right(results) =>
+          results.result.hits.hits.foreach { searchHit =>
+            searchHit.sourceAsMap.get(field).foreach { idValue =>
+              val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get
+
+              //TODO: Come up with better way to filter out hits with invalid 
meta.
+              EdgeId.isValid(id).foreach(ids.add)
+            }
           }
+      }
 
-          new util.ArrayList[EdgeId](ids)
-        }
+      new util.ArrayList[EdgeId](ids)
     }
   }
 
@@ -175,28 +169,24 @@ class ESIndexProvider(config: Config)(implicit ec: 
ExecutionContext) extends Ind
     val field = vidField
     val ids = new java.util.HashSet[VertexId]
 
-    GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers) match {
-      case None => Future.successful(new util.ArrayList[VertexId](ids))
-      case Some(globalIndex) =>
-        val queryString = buildQueryString(hasContainers)
-
-        client.execute {
-          search(globalIndex.backendIndexName).query(queryString)
-        }.map { ret =>
-          ret match {
-            case Left(failure) =>
-            case Right(results) =>
-              results.result.hits.hits.foreach { searchHit =>
-                searchHit.sourceAsMap.get(field).foreach { idValue =>
-                  val id = 
Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get
-                  //TODO: Come up with better way to filter out hits with 
invalid meta.
-                  VertexId.isValid(id).foreach(ids.add)
-                }
-              }
+    val queryString = buildQueryString(hasContainers)
+
+    client.execute {
+      search(GlobalIndex.TableName).query(queryString)
+    }.map { ret =>
+      ret match {
+        case Left(failure) =>
+        case Right(results) =>
+          results.result.hits.hits.foreach { searchHit =>
+            searchHit.sourceAsMap.get(field).foreach { idValue =>
+              val id = 
Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get
+              //TODO: Come up with better way to filter out hits with invalid 
meta.
+              VertexId.isValid(id).foreach(ids.add)
+            }
           }
+      }
 
-          new util.ArrayList[VertexId](ids)
-        }
+      new util.ArrayList[VertexId](ids)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index dcddadf..864209f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -120,11 +120,11 @@ trait IndexProvider {
   def fetchVertexIds(hasContainers: java.util.List[HasContainer]): 
java.util.List[VertexId]
   def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): 
Future[java.util.List[VertexId]]
 
-  def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean]
-  def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]]
+  def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = 
false): Seq[Boolean]
+  def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = 
false): Future[Seq[Boolean]]
 
-  def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean]
-  def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]]
+  def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): 
Seq[Boolean]
+  def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): 
Future[Seq[Boolean]]
 
   def shutdown(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index 0670d48..b750499 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -9,12 +9,11 @@ import org.apache.lucene.index.{DirectoryReader, IndexWriter, 
IndexWriterConfig}
 import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
 import org.apache.lucene.search.IndexSearcher
 import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
-import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
 import org.apache.s2graph.core.io.Conversions
 import org.apache.s2graph.core.mysqls.GlobalIndex
-import org.apache.s2graph.core.mysqls.GlobalIndex.{eidField, labelField, 
serviceColumnField, serviceField, vidField}
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import play.api.libs.json.Json
 
@@ -22,10 +21,11 @@ import scala.concurrent.Future
 
 
 class LuceneIndexProvider(config: Config) extends IndexProvider {
+  import GlobalIndex._
   import IndexProvider._
-  import scala.collection.mutable
+
   import scala.collection.JavaConverters._
-  import GlobalIndex._
+  import scala.collection.mutable
 
   val analyzer = new StandardAnalyzer()
   val writers = mutable.Map.empty[String, IndexWriter]
@@ -39,11 +39,11 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     })
   }
 
-  private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): 
Option[Document] = {
+  private def toDocument(vertex: S2VertexLike, forceToIndex: Boolean): 
Option[Document] = {
     val props = vertex.props.asScala
-    val filtered = props.filterKeys(globalIndex.propNamesSet)
+    val storeInGlobalIndex = if (forceToIndex) true else 
props.exists(_._2.columnMeta.storeInGlobalIndex)
 
-    if (filtered.isEmpty) None
+    if (!storeInGlobalIndex) None
     else {
       val doc = new Document()
       val id = vertex.id.toString
@@ -52,24 +52,28 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
       doc.add(new StringField(serviceField, vertex.serviceName, 
Field.Store.YES))
       doc.add(new StringField(serviceColumnField, vertex.columnName, 
Field.Store.YES))
 
-      filtered.foreach { case (dim, s2VertexProperty) =>
-        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES 
else Field.Store.NO
-        val field = s2VertexProperty.columnMeta.dataType match {
-          case "string" => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
-          case _ => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
+      props.foreach { case (dim, s2VertexProperty) =>
+        val columnMeta = s2VertexProperty.columnMeta
+        if (columnMeta.seq > 0) {
+          val shouldIndex = if (columnMeta.storeInGlobalIndex) Field.Store.YES 
else Field.Store.NO
+
+          val field = columnMeta.dataType match {
+            case "string" => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
+            case _ => new StringField(dim, 
s2VertexProperty.innerVal.value.toString, shouldIndex)
+          }
+          doc.add(field)
         }
-        doc.add(field)
       }
 
       Option(doc)
     }
   }
 
-  private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): 
Option[Document] = {
+  private def toDocument(edge: S2EdgeLike, forceToIndex: Boolean): 
Option[Document] = {
     val props = edge.getPropsWithTs().asScala
-    val filtered = props.filterKeys(globalIndex.propNamesSet)
+    val store = if (forceToIndex) true else 
props.exists(_._2.labelMeta.storeInGlobalIndex)
 
-    if (filtered.isEmpty) None
+    if (!store) None
     else {
       val doc = new Document()
       val id = edge.edgeId.toString
@@ -78,8 +82,9 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
       doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES))
       doc.add(new StringField(labelField, edge.label(), Field.Store.YES))
 
-      filtered.foreach { case (dim, s2Property) =>
-        val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES 
else Field.Store.NO
+      props.foreach { case (dim, s2Property) =>
+        val shouldIndex = if (s2Property.labelMeta.storeInGlobalIndex) 
Field.Store.YES else Field.Store.NO
+
         val field = s2Property.labelMeta.dataType match {
           case "string" => new StringField(dim, 
s2Property.innerVal.value.toString, shouldIndex)
           case _ => new StringField(dim, s2Property.innerVal.value.toString, 
shouldIndex)
@@ -91,39 +96,37 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     }
   }
 
-  override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = {
-    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType)
+  override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean 
= false): Future[Seq[Boolean]] =
+    Future.successful(mutateEdges(edges, forceToIndex))
 
-    globalIndexOptions.map { globalIndex =>
-      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
+  override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Seq[Boolean] = {
+    val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName)
 
-      vertices.foreach { vertex =>
-        toDocument(globalIndex, vertex).foreach { doc =>
-          writer.addDocument(doc)
-        }
+    vertices.foreach { vertex =>
+      toDocument(vertex, forceToIndex).foreach { doc =>
+        writer.addDocument(doc)
       }
-
-      writer.commit()
     }
 
+    writer.commit()
+
     vertices.map(_ => true)
   }
 
-  override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = {
-    val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
+  override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: 
Boolean = false): Future[Seq[Boolean]] =
+    Future.successful(mutateVertices(vertices, forceToIndex))
 
-    globalIndexOptions.map { globalIndex =>
-      val writer = getOrElseCreateIndexWriter(globalIndex.indexName)
+  override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = 
false): Seq[Boolean] = {
+    val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName)
 
-      edges.foreach { edge =>
-        toDocument(globalIndex, edge).foreach { doc =>
-          writer.addDocument(doc)
-        }
+    edges.foreach { edge =>
+      toDocument(edge, forceToIndex).foreach { doc =>
+        writer.addDocument(doc)
       }
-
-      writer.commit()
     }
 
+    writer.commit()
+
     edges.map(_ => true)
   }
 
@@ -131,30 +134,28 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
     val field = eidField
     val ids = new java.util.HashSet[EdgeId]
 
-    GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { 
globalIndex =>
-      val queryString = buildQueryString(hasContainers)
+    val queryString = buildQueryString(hasContainers)
 
-      try {
-        val q = new QueryParser(field, analyzer).parse(queryString)
+    try {
+      val q = new QueryParser(field, analyzer).parse(queryString)
 
-        val reader = DirectoryReader.open(directories(globalIndex.indexName))
-        val searcher = new IndexSearcher(reader)
+      val reader = DirectoryReader.open(directories(GlobalIndex.TableName))
+      val searcher = new IndexSearcher(reader)
 
-        val docs = searcher.search(q, hitsPerPage)
+      val docs = searcher.search(q, hitsPerPage)
 
-        docs.scoreDocs.foreach { scoreDoc =>
-          val document = searcher.doc(scoreDoc.doc)
-          val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
-          ids.add(id);
-        }
+      docs.scoreDocs.foreach { scoreDoc =>
+        val document = searcher.doc(scoreDoc.doc)
+        val id = 
Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
+        ids.add(id);
+      }
 
-        reader.close()
+      reader.close()
+      ids
+    } catch {
+      case ex: ParseException =>
+        logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
         ids
-      } catch {
-        case ex: ParseException =>
-          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
-          ids
-      }
     }
 
     new util.ArrayList[EdgeId](ids)
@@ -163,31 +164,28 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
   override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): 
java.util.List[VertexId] = {
     val field = vidField
     val ids = new java.util.HashSet[VertexId]
+    val queryString = buildQueryString(hasContainers)
 
-    GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { 
globalIndex =>
-      val queryString = buildQueryString(hasContainers)
+    try {
+      val q = new QueryParser(field, analyzer).parse(queryString)
 
-      try {
-        val q = new QueryParser(field, analyzer).parse(queryString)
+      val reader = DirectoryReader.open(directories(GlobalIndex.TableName))
+      val searcher = new IndexSearcher(reader)
 
-        val reader = DirectoryReader.open(directories(globalIndex.indexName))
-        val searcher = new IndexSearcher(reader)
+      val docs = searcher.search(q, hitsPerPage)
 
-        val docs = searcher.search(q, hitsPerPage)
-
-        docs.scoreDocs.foreach { scoreDoc =>
-          val document = searcher.doc(scoreDoc.doc)
-          val id = 
Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
-          ids.add(id)
-        }
+      docs.scoreDocs.foreach { scoreDoc =>
+        val document = searcher.doc(scoreDoc.doc)
+        val id = 
Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
+        ids.add(id)
+      }
 
-        reader.close()
+      reader.close()
+      ids
+    } catch {
+      case ex: ParseException =>
+        logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
         ids
-      } catch {
-        case ex: ParseException =>
-          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
-          ids
-      }
     }
 
     new util.ArrayList[VertexId](ids)
@@ -200,9 +198,5 @@ class LuceneIndexProvider(config: Config) extends 
IndexProvider {
   override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): 
Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
 
   override def fetchVertexIdsAsync(hasContainers: 
java.util.List[HasContainer]): Future[util.List[VertexId]] = 
Future.successful(fetchVertexIds(hasContainers))
-
-  override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): 
Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
-
-  override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] 
= Future.successful(mutateEdges(edges))
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
index f314083..974965f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
@@ -91,7 +91,8 @@ object Conversions {
       (JsPath \ "columnId").read[Int] and
       (JsPath \ "name").read[String] and
       (JsPath \ "seq").read[Byte] and
-      (JsPath \ "dataType").read[String]
+      (JsPath \ "dataType").read[String] and
+      (JsPath \ "storeGlobalIndex").read[Boolean]
     )(ColumnMeta.apply _)
 
   implicit val columnMetaWrites: Writes[ColumnMeta] = (
@@ -99,7 +100,8 @@ object Conversions {
       (JsPath \ "columnId").write[Int] and
       (JsPath \ "name").write[String] and
       (JsPath \ "seq").write[Byte] and
-      (JsPath \ "dataType").write[String]
+      (JsPath \ "dataType").write[String] and
+      (JsPath \ "storeGlobalIndex").write[Boolean]
     )(unlift(ColumnMeta.unapply))
 
   /* Graph Class */

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
index 14ad381..b764841 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
@@ -22,6 +22,8 @@ package org.apache.s2graph.core.mysqls
 import play.api.libs.json.Json
 import scalikejdbc._
 
+import scala.util.Try
+
 object ColumnMeta extends Model[ColumnMeta] {
 
   val timeStampSeq = -1.toByte
@@ -34,7 +36,8 @@ object ColumnMeta extends Model[ColumnMeta] {
   val reservedMetaNamesSet = reservedMetas.map(_.name).toSet
 
   def valueOf(rs: WrappedResultSet): ColumnMeta = {
-    ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), 
rs.byte("seq"), rs.string("data_type").toLowerCase())
+    ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"),
+      rs.byte("seq"), rs.string("data_type").toLowerCase(), 
rs.boolean("store_in_global_index"))
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession) = {
@@ -69,21 +72,25 @@ object ColumnMeta extends Model[ColumnMeta] {
     }
   }
 
-  def insert(columnId: Int, name: String, dataType: String)(implicit session: 
DBSession = AutoSession) = {
+  def insert(columnId: Int, name: String, dataType: String, 
storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) 
= {
     val ls = findAllByColumn(columnId, false)
     val seq = ls.size + 1
     if (seq <= maxValue) {
-      sql"""insert into column_metas(column_id, name, seq, data_type)
-    select ${columnId}, ${name}, ${seq}, ${dataType}"""
+      sql"""insert into column_metas(column_id, name, seq, data_type, 
store_in_global_index)
+    select ${columnId}, ${name}, ${seq}, ${dataType}, ${storeInGlobalIndex}"""
         .updateAndReturnGeneratedKey.apply()
     }
   }
 
-  def findOrInsert(columnId: Int, name: String, dataType: String, useCache: 
Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = {
+  def findOrInsert(columnId: Int,
+                   name: String,
+                   dataType: String,
+                   storeInGlobalIndex: Boolean = false,
+                   useCache: Boolean = true)(implicit session: DBSession = 
AutoSession): ColumnMeta = {
     findByName(columnId, name, useCache) match {
       case Some(c) => c
       case None =>
-        insert(columnId, name, dataType)
+        insert(columnId, name, dataType, storeInGlobalIndex)
         expireCache(s"columnId=$columnId:name=$name")
         findByName(columnId, name).get
     }
@@ -130,9 +137,20 @@ object ColumnMeta extends Model[ColumnMeta] {
       (cacheKey -> ls)
     }.toList)
   }
+
+  def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit 
session: DBSession = AutoSession): Try[Long] = Try {
+    sql"""
+          update column_metas set store_in_global_index = 
${storeInGlobalIndex} where id = ${id}
+       """.updateAndReturnGeneratedKey.apply()
+  }
 }
 
-case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, 
dataType: String) {
+case class ColumnMeta(id: Option[Int],
+                      columnId: Int,
+                      name: String,
+                      seq: Byte,
+                      dataType: String,
+                      storeInGlobalIndex: Boolean = false) {
   lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
   override def equals(other: Any): Boolean = {
     if (!other.isInstanceOf[ColumnMeta]) false

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 97fd704..2956f89 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -204,8 +204,8 @@ object Label extends Model[Label] {
             hTableName.getOrElse(service.hTableName), 
hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync,
             compressionAlgorithm, options).toInt
 
-          val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, 
dataType) =>
-            val labelMeta = LabelMeta.findOrInsert(createdId, propName, 
defaultValue, dataType)
+          val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, 
dataType, storeInGlobalIndex) =>
+            val labelMeta = LabelMeta.findOrInsert(createdId, propName, 
defaultValue, dataType, storeInGlobalIndex)
             (propName -> labelMeta.seq)
           }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name 
-> labelMeta.seq).toMap
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
index a16334a..920b432 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
@@ -28,6 +28,8 @@ import org.apache.s2graph.core.{GraphExceptions, JSONParser}
 import play.api.libs.json.Json
 import scalikejdbc._
 
+import scala.util.Try
+
 object LabelMeta extends Model[LabelMeta] {
 
   /** dummy sequences */
@@ -78,7 +80,8 @@ object LabelMeta extends Model[LabelMeta] {
   val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", 
"_from_hash", "label", "direction", "timestamp", "_timestamp")
 
   def apply(rs: WrappedResultSet): LabelMeta = {
-    LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase)
+    LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"),
+      rs.string("default_value"), rs.string("data_type").toLowerCase, 
rs.boolean("store_in_global_index"))
   }
 
   /** Note: DegreeSeq should not be included in serializer/deserializer.
@@ -124,13 +127,13 @@ object LabelMeta extends Model[LabelMeta] {
     }
   }
 
-  def insert(labelId: Int, name: String, defaultValue: String, dataType: 
String)(implicit session: DBSession = AutoSession) = {
+  def insert(labelId: Int, name: String, defaultValue: String, dataType: 
String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = 
AutoSession) = {
     val ls = findAllByLabelId(labelId, useCache = false)
     val seq = ls.size + 1
 
     if (seq < maxValue) {
-      sql"""insert into label_metas(label_id, name, seq, default_value, 
data_type)
-    select ${labelId}, ${name}, ${seq}, ${defaultValue}, 
${dataType}""".updateAndReturnGeneratedKey.apply()
+      sql"""insert into label_metas(label_id, name, seq, default_value, 
data_type, store_in_global_index)
+    select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, 
${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply()
     } else {
       throw MaxPropSizeReachedException("max property size reached")
     }
@@ -139,12 +142,13 @@ object LabelMeta extends Model[LabelMeta] {
   def findOrInsert(labelId: Int,
                    name: String,
                    defaultValue: String,
-                   dataType: String)(implicit session: DBSession = 
AutoSession): LabelMeta = {
+                   dataType: String,
+                   storeInGlobalIndex: Boolean = false)(implicit session: 
DBSession = AutoSession): LabelMeta = {
 
     findByName(labelId, name) match {
       case Some(c) => c
       case None =>
-        insert(labelId, name, defaultValue, dataType)
+        insert(labelId, name, defaultValue, dataType, storeInGlobalIndex)
         val cacheKey = "labelId=" + labelId + ":name=" + name
         val cacheKeys = "labelId=" + labelId
         expireCache(cacheKey)
@@ -184,6 +188,12 @@ object LabelMeta extends Model[LabelMeta] {
       cacheKey -> ls
     }.toList)
   }
+
+  def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit 
session: DBSession = AutoSession): Try[Long] = Try {
+    sql"""
+          update label_metas set store_in_global_index = ${storeInGlobalIndex} 
where id = ${id}
+       """.updateAndReturnGeneratedKey.apply()
+  }
 }
 
 case class LabelMeta(id: Option[Int],
@@ -191,7 +201,8 @@ case class LabelMeta(id: Option[Int],
                      name: String,
                      seq: Byte,
                      defaultValue: String,
-                     dataType: String) {
+                     dataType: String,
+                     storeInGlobalIndex: Boolean = false) {
   lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, 
"dataType" -> dataType)
   override def equals(other: Any): Boolean = {
     if (!other.isInstanceOf[LabelMeta]) false

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
index e745cb0..d7e9cea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
@@ -123,4 +123,6 @@ case class Service(id: Option[Int],
   lazy val extraOptions = Model.extraOptions(options)
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
   def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions)
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index 08f075d..820f93a 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -95,14 +95,14 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
     val vertexPropsKeys = List("age" -> "integer", "im" -> "string")
 
     vertexPropsKeys.map { case (key, keyType) =>
-      Management.addVertexProp(testServiceName, testColumnName, key, keyType)
+      Management.addVertexProp(testServiceName, testColumnName, key, keyType, 
storeInGlobalIndex = true)
     }
 
     // vertex type global index.
-    val globalVertexIndex = 
management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", 
Seq("age"))
+//    val globalVertexIndex = 
management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", 
Seq("age"))
 
     // edge type global index.
-    val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, 
"test_weight_time_edge", Seq("weight", "time"))
+//    val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, 
"test_weight_time_edge", Seq("weight", "time"))
 
     logger.info("[init end]: 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index 679f94a..9ed12e6 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -43,7 +43,7 @@ class IndexProviderTest extends IntegrateCommon {
     val testColumn = ServiceColumn.find(testService.id.get, 
TestUtil.testColumnName).get
     val vertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L)
     val indexPropsColumnMeta = testColumn.metasInvMap("age")
-
+    ColumnMeta.updateStoreInGlobalIndex(indexPropsColumnMeta.id.get, 
storeInGlobalIndex = true)
 
     val propsWithTs = Map(
       indexPropsColumnMeta -> InnerVal.withInt(1, "v4")
@@ -85,8 +85,11 @@ class IndexProviderTest extends IntegrateCommon {
     val otherVertexId = 
graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L)
     val vertex = graph.elementBuilder.newVertex(vertexId)
     val otherVertex = graph.elementBuilder.newVertex(otherVertexId)
-    val weightMeta = testLabel.metaPropsInvMap("weight")
-    val timeMeta = testLabel.metaPropsInvMap("time")
+    val weightMeta = 
testLabel.metaPropsInvMap("weight").copy(storeInGlobalIndex = true)
+    val timeMeta = testLabel.metaPropsInvMap("time").copy(storeInGlobalIndex = 
true)
+
+    LabelMeta.updateStoreInGlobalIndex(weightMeta.id.get, storeInGlobalIndex = 
true)
+    LabelMeta.updateStoreInGlobalIndex(timeMeta.id.get, storeInGlobalIndex = 
true)
 
     val propsWithTs = Map(
       weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"),
@@ -104,7 +107,7 @@ class IndexProviderTest extends IntegrateCommon {
 
     println(s"[# of edges]: ${edges.size}")
     edges.foreach(e => println(s"[Edge]: $e"))
-    indexProvider.mutateEdges(edges)
+    indexProvider.mutateEdges(edges, forceToIndex = true)
 
     // match
     (0 until numOfTry).foreach { _ =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala
deleted file mode 100644
index 1213ad0..0000000
--- a/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.models
-
-import org.apache.s2graph.core.TestCommonWithModels
-import org.apache.s2graph.core.mysqls.GlobalIndex
-import org.apache.tinkerpop.gremlin.process.traversal.P
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import scala.collection.JavaConversions._
-
-class GlobalIndexTest extends FunSuite with Matchers with TestCommonWithModels 
with BeforeAndAfterAll {
-  override def beforeAll(): Unit = {
-    initTests()
-  }
-
-  override def afterAll(): Unit = {
-    graph.shutdown()
-  }
-
-  test("test buildGlobalIndex.") {
-    Seq(GlobalIndex.EdgeType, GlobalIndex.VertexType).foreach { elementType =>
-      management.buildGlobalIndex(elementType, "test_global", Seq("weight", 
"date", "name"))
-    }
-  }
-
-  test("findGlobalIndex.") {
-    // (weight: 34) AND (weight: [1 TO 100])
-//    Seq(GlobalIndex.EdgeType, GlobalIndex.VertexType).foreach { elementType 
=>
-//      val idx1 = management.buildGlobalIndex(elementType, "test-1", 
Seq("weight", "age", "name"))
-//      val idx2 = management.buildGlobalIndex(elementType, "test-2", 
Seq("gender", "age"))
-//      val idx3 = management.buildGlobalIndex(elementType, "test-3", 
Seq("class"))
-//
-//      var hasContainers = Seq(
-//        new HasContainer("weight", P.eq(Int.box(34))),
-//        new HasContainer("age", P.between(Int.box(1), Int.box(100)))
-//      )
-//
-//      GlobalIndex.findGlobalIndex(elementType, hasContainers) 
shouldBe(Option(idx1))
-//
-//      hasContainers = Seq(
-//        new HasContainer("gender", P.eq(Int.box(34))),
-//        new HasContainer("age", P.eq(Int.box(34))),
-//        new HasContainer("class", P.eq(Int.box(34)))
-//      )
-//
-//      GlobalIndex.findGlobalIndex(elementType, hasContainers) 
shouldBe(Option(idx2))
-//
-//      hasContainers = Seq(
-//        new HasContainer("class", P.eq(Int.box(34))),
-//        new HasContainer("_", P.eq(Int.box(34)))
-//      )
-//
-//      GlobalIndex.findGlobalIndex(elementType, hasContainers) 
shouldBe(Option(idx3))
-//
-//      hasContainers = Seq(
-//        new HasContainer("key", P.eq(Int.box(34))),
-//        new HasContainer("value", P.eq(Int.box(34)))
-//      )
-//
-//      GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(None)
-//    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index bf3291e..ca78917 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -143,7 +143,7 @@ class S2GraphProvider extends AbstractGraphProvider {
       }
 
       ColumnMeta.findByName(defaultServiceColumn.id.get, "aKey", useCache = 
false).foreach(cm => ColumnMeta.delete(cm.id.get))
-      ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, 
useCache = false)
+      ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, 
storeInGlobalIndex = true, useCache = false)
     }
     if (loadGraphWith != null && loadGraphWith.value() == GraphData.MODERN) {
       mnt.createLabel("knows", defaultService.serviceName, "person", 
"integer", defaultService.serviceName, "person", "integer",
@@ -454,9 +454,6 @@ class S2GraphProvider extends AbstractGraphProvider {
       options = Option("""{"skipReverse": false}""")
     )
 
-    Seq(GlobalIndex.VertexType, GlobalIndex.EdgeType).foreach { elementType =>
-      mnt.buildGlobalIndex(elementType, "global", allProps.map(_.name).toSeq)
-    }
     super.loadGraphData(graph, loadGraphWith, testClass, testName)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
index 18a6c85..45060cc 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
@@ -41,7 +41,6 @@ class S2GraphTest extends FunSuite with Matchers with 
TestCommonWithModels {
   initTests()
 
   val g = new S2Graph(config)
-  lazy val gIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, 
"S2GraphTest2", Seq("weight"))
   def printEdges(edges: Seq[Edge]): Unit = {
     edges.foreach { edge =>
       logger.debug(s"[FetchedEdge]: $edge")
@@ -417,7 +416,6 @@ class S2GraphTest extends FunSuite with Matchers with 
TestCommonWithModels {
 ////    }
 //  }
   ignore("Modern") {
-    gIndex
     val mnt = graph.management
 
     S2GraphFactory.cleanupDefaultSchema

Reply via email to