move global index setting under ColumnMeta, LabelMeta.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/63012f8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/63012f8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/63012f8e Branch: refs/heads/master Commit: 63012f8e51b65626b3b62fa20cded2b6c809da4e Parents: 045392f Author: DO YUNG YOON <[email protected]> Authored: Fri Feb 9 21:57:23 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Feb 10 09:03:31 2018 +0900 ---------------------------------------------------------------------- dev_support/graph_mysql/schema.sql | 2 + .../org/apache/s2graph/core/mysqls/schema.sql | 2 + .../org/apache/s2graph/core/Management.scala | 79 ++++++---- .../apache/s2graph/core/S2GraphFactory.scala | 48 +++--- .../s2graph/core/index/ESIndexProvider.scala | 140 ++++++++--------- .../s2graph/core/index/IndexProvider.scala | 8 +- .../core/index/LuceneIndexProvider.scala | 152 +++++++++---------- .../apache/s2graph/core/io/Conversions.scala | 6 +- .../apache/s2graph/core/mysqls/ColumnMeta.scala | 32 +++- .../org/apache/s2graph/core/mysqls/Label.scala | 4 +- .../apache/s2graph/core/mysqls/LabelMeta.scala | 25 ++- .../apache/s2graph/core/mysqls/Service.scala | 2 + .../core/Integrate/IntegrateCommon.scala | 6 +- .../s2graph/core/index/IndexProviderTest.scala | 11 +- .../s2graph/core/models/GlobalIndexTest.scala | 81 ---------- .../core/tinkerpop/S2GraphProvider.scala | 5 +- .../core/tinkerpop/structure/S2GraphTest.scala | 2 - 17 files changed, 281 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/dev_support/graph_mysql/schema.sql ---------------------------------------------------------------------- diff --git a/dev_support/graph_mysql/schema.sql b/dev_support/graph_mysql/schema.sql index ea72c63..48b27df 100644 --- a/dev_support/graph_mysql/schema.sql +++ b/dev_support/graph_mysql/schema.sql @@ -76,6 +76,7 @@ CREATE TABLE `column_metas` ( `name` varchar(64) NOT NULL, `seq` tinyint NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'string', + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), INDEX `idx_column_id_seq` (`column_id`, `seq`) @@ -147,6 +148,7 @@ CREATE TABLE `label_metas` ( `default_value` varchar(64) NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'long', `used_in_index` tinyint NOT NULL DEFAULT 0, + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_label_id_name` (`label_id`, `name`), INDEX `idx_label_id_seq` (`label_id`, `seq`) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql index f5f6a61..b66b46b 100644 --- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql +++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql @@ -65,6 +65,7 @@ CREATE TABLE `column_metas` ( `name` varchar(64) NOT NULL, `seq` tinyint NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'string', + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), INDEX `idx_column_id_seq` (`column_id`, `seq`) @@ -136,6 +137,7 @@ CREATE TABLE `label_metas` ( `default_value` varchar(64) NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'long', `used_in_index` tinyint NOT NULL DEFAULT 0, + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_label_metas_label_id_name` (`label_id`, `name`), INDEX `idx_label_metas_label_id_seq` (`label_id`, `seq`) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 8d2d62a..3af8b79 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -63,9 +63,9 @@ object Management { object JsonModel { - case class Prop(name: String, defaultValue: String, datatType: String) + case class Prop(name: String, defaultValue: String, datatType: String, storeInGlobalIndex: Boolean = false) - object Prop extends ((String, String, String) => Prop) + object Prop extends ((String, String, String, Boolean) => Prop) case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) } @@ -102,9 +102,10 @@ object Management { case Some(service) => val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) for { - Prop(propName, defaultValue, dataType) <- props + Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, useCache = false) + ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, + storeInGlobalIndex = storeInGlobalIndex, useCache = false) } } } @@ -165,7 +166,7 @@ object Management { val labelOpt = Label.findByName(labelStr) val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) - LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType) + LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType, prop.storeInGlobalIndex) } } @@ -175,8 +176,8 @@ object Management { val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) props.map { - case Prop(propName, defaultValue, dataType) => - LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType) + case Prop(propName, defaultValue, dataType, storeInGlobalIndex) => + LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType, storeInGlobalIndex) } } } @@ -185,12 +186,13 @@ object Management { columnName: String, propsName: String, propsType: String, + storeInGlobalIndex: Boolean = false, schemaVersion: String = DEFAULT_VERSION): ColumnMeta = { val result = for { service <- Service.findByName(serviceName, useCache = false) serviceColumn <- ServiceColumn.find(service.id.get, columnName) } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType) + ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, storeInGlobalIndex) } result.getOrElse({ throw new RuntimeException(s"add property on vertex failed") @@ -361,9 +363,10 @@ class Management(graph: S2GraphLike) { case Some(service) => val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) for { - Prop(propName, defaultValue, dataType) <- props + Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, useCache = false) + ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, + storeInGlobalIndex = storeInGlobalIndex, useCache = false) } serviceColumn } @@ -466,26 +469,42 @@ class Management(graph: S2GraphLike) { old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options) } - def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.VertexType, name, propNames) - - def buildGlobalVertexIndex(name: String, propNames: Seq[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.VertexType, name, propNames) - - def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) - - def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) - - def buildGlobalIndex(elementType: String, name: String, propNames: Seq[String]): GlobalIndex = { - GlobalIndex.findBy(elementType, name, false) match { - case None => - GlobalIndex.insert(elementType, name, propNames) - GlobalIndex.findBy(elementType, name, false).get - case Some(oldIndex) => oldIndex - } - } + def enableVertexGlobalIndex(columnMeats: Seq[ColumnMeta]): Boolean = { + val successes = columnMeats.map { cm => + ColumnMeta.updateStoreInGlobalIndex(cm.id.get, cm.storeInGlobalIndex) + }.map(_.isSuccess) + + successes.forall(identity) + } + + def enableEdgeGlobalIndex(labelMetas: Seq[LabelMeta]): Boolean = { + val successes = labelMetas.map { lm => + LabelMeta.updateStoreInGlobalIndex(lm.id.get, lm.storeInGlobalIndex) + }.map(_.isSuccess) + + successes.forall(identity) + } + +// def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.VertexType, name, propNames) +// +// def buildGlobalVertexIndex(name: String, propNames: Seq[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.VertexType, name, propNames) +// +// def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) +// +// def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) +// +// def buildGlobalIndex(elementType: String, name: String, propNames: Seq[String]): GlobalIndex = { +// GlobalIndex.findBy(elementType, name, false) match { +// case None => +// GlobalIndex.insert(elementType, name, propNames) +// GlobalIndex.findBy(elementType, name, false).get +// case Some(oldIndex) => oldIndex +// } +// } def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for { label <- Try(Label.findByName(labelName, useCache = false).get) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala index a9c98c2..07a9be1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala @@ -69,30 +69,30 @@ object S2GraphFactory { val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false) val DefaultColumnMetas = { - ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", storeInGlobalIndex = true, useCache = false) } // Management.deleteLabel("_s2graph") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala index 6486fa9..cb74cf1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala @@ -33,11 +33,11 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind val WaitTime = Duration("60 seconds") - private def toFields(globalIndex: GlobalIndex, vertex: S2VertexLike): Option[Map[String, Any]] = { + private def toFields(vertex: S2VertexLike, forceToIndex: Boolean): Option[Map[String, Any]] = { val props = vertex.props.asScala - val filtered = props.filterKeys(globalIndex.propNamesSet) + val storeInGlobalIndex = if (forceToIndex) true else props.exists(_._2.columnMeta.storeInGlobalIndex) - if (filtered.isEmpty) None + if (!storeInGlobalIndex) None else { val fields = mutable.Map.empty[String, Any] @@ -45,10 +45,13 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind fields += (serviceField -> vertex.serviceName) fields += (serviceColumnField -> vertex.columnName) - filtered.foreach { case (dim, s2VertexProperty) => - s2VertexProperty.columnMeta.dataType match { - case "string" => fields += (dim -> s2VertexProperty.innerVal.value.toString) - case _ => fields += (dim -> s2VertexProperty.innerVal.value) + props.foreach { case (dim, s2VertexProperty) => + // skip reserved fields. + if (s2VertexProperty.columnMeta.seq > 0) { + s2VertexProperty.columnMeta.dataType match { + case "string" => fields += (dim -> s2VertexProperty.innerVal.value.toString) + case _ => fields += (dim -> s2VertexProperty.innerVal.value) + } } } @@ -56,11 +59,11 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind } } - private def toFields(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Map[String, Any]] = { + private def toFields(edge: S2EdgeLike, forceToIndex: Boolean): Option[Map[String, Any]] = { val props = edge.getPropsWithTs().asScala - val filtered = props.filterKeys(globalIndex.propNamesSet) + val store = if (forceToIndex) true else props.exists(_._2.labelMeta.storeInGlobalIndex) - if (filtered.isEmpty) None + if (!store) None else { val fields = mutable.Map.empty[String, Any] @@ -68,10 +71,12 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind fields += (serviceField -> edge.serviceName) fields += (labelField -> edge.label()) - filtered.foreach { case (dim, s2Property) => - s2Property.labelMeta.dataType match { - case "string" => fields += (dim -> s2Property.innerVal.value.toString) - case _ => fields += (dim -> s2Property.innerVal.value) + props.foreach { case (dim, s2Property) => + if (s2Property.labelMeta.seq > 0) { + s2Property.labelMeta.dataType match { + case "string" => fields += (dim -> s2Property.innerVal.value.toString) + case _ => fields += (dim -> s2Property.innerVal.value) + } } } @@ -79,16 +84,13 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind } } - override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType) - - val bulkRequests = globalIndexOptions.flatMap { globalIndex => - vertices.flatMap { vertex => - toFields(globalIndex, vertex).toSeq.map { fields => - indexInto(globalIndex.backendIndexNameWithType).fields(fields) + override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { + val bulkRequests = vertices.flatMap { vertex => + toFields(vertex, forceToIndex).toSeq.map { fields => + indexInto(GlobalIndex.TableName).fields(fields) } } - } + if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true)) else { client.execute { @@ -104,20 +106,16 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind } } - override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = - Await.result(mutateVerticesAsync(vertices), WaitTime) - - override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = - Await.result(mutateEdgesAsync(edges), WaitTime) + override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = + Await.result(mutateVerticesAsync(vertices, forceToIndex), WaitTime) - override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) + override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = + Await.result(mutateEdgesAsync(edges, forceToIndex), WaitTime) - val bulkRequests = globalIndexOptions.flatMap { globalIndex => - edges.flatMap { edge => - toFields(globalIndex, edge).toSeq.map { fields => - indexInto(globalIndex.backendIndexNameWithType).fields(fields) - } + override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { + val bulkRequests = edges.flatMap { edge => + toFields(edge, forceToIndex).toSeq.map { fields => + indexInto(GlobalIndex.TableName).fields(fields) } } @@ -141,29 +139,25 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind val field = eidField val ids = new java.util.HashSet[EdgeId] - GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers) match { - case None => Future.successful(new util.ArrayList[EdgeId](ids)) - case Some(globalIndex) => - val queryString = buildQueryString(hasContainers) - - client.execute { - search(globalIndex.backendIndexName).query(queryString) - }.map { ret => - ret match { - case Left(failure) => - case Right(results) => - results.result.hits.hits.foreach { searchHit => - searchHit.sourceAsMap.get(field).foreach { idValue => - val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get - - //TODO: Come up with better way to filter out hits with invalid meta. - EdgeId.isValid(id).foreach(ids.add) - } - } + val queryString = buildQueryString(hasContainers) + + client.execute { + search(GlobalIndex.TableName).query(queryString) + }.map { ret => + ret match { + case Left(failure) => + case Right(results) => + results.result.hits.hits.foreach { searchHit => + searchHit.sourceAsMap.get(field).foreach { idValue => + val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get + + //TODO: Come up with better way to filter out hits with invalid meta. + EdgeId.isValid(id).foreach(ids.add) + } } + } - new util.ArrayList[EdgeId](ids) - } + new util.ArrayList[EdgeId](ids) } } @@ -175,28 +169,24 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind val field = vidField val ids = new java.util.HashSet[VertexId] - GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers) match { - case None => Future.successful(new util.ArrayList[VertexId](ids)) - case Some(globalIndex) => - val queryString = buildQueryString(hasContainers) - - client.execute { - search(globalIndex.backendIndexName).query(queryString) - }.map { ret => - ret match { - case Left(failure) => - case Right(results) => - results.result.hits.hits.foreach { searchHit => - searchHit.sourceAsMap.get(field).foreach { idValue => - val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get - //TODO: Come up with better way to filter out hits with invalid meta. - VertexId.isValid(id).foreach(ids.add) - } - } + val queryString = buildQueryString(hasContainers) + + client.execute { + search(GlobalIndex.TableName).query(queryString) + }.map { ret => + ret match { + case Left(failure) => + case Right(results) => + results.result.hits.hits.foreach { searchHit => + searchHit.sourceAsMap.get(field).foreach { idValue => + val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get + //TODO: Come up with better way to filter out hits with invalid meta. + VertexId.isValid(id).foreach(ids.add) + } } + } - new util.ArrayList[VertexId](ids) - } + new util.ArrayList[VertexId](ids) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index dcddadf..864209f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -120,11 +120,11 @@ trait IndexProvider { def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]] - def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] - def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] + def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] + def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] - def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] - def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] + def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] + def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] def shutdown(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala index 0670d48..b750499 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala @@ -9,12 +9,11 @@ import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} import org.apache.lucene.search.IndexSearcher import org.apache.lucene.store.{BaseDirectory, RAMDirectory} -import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} import org.apache.s2graph.core.io.Conversions import org.apache.s2graph.core.mysqls.GlobalIndex -import org.apache.s2graph.core.mysqls.GlobalIndex.{eidField, labelField, serviceColumnField, serviceField, vidField} import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import play.api.libs.json.Json @@ -22,10 +21,11 @@ import scala.concurrent.Future class LuceneIndexProvider(config: Config) extends IndexProvider { + import GlobalIndex._ import IndexProvider._ - import scala.collection.mutable + import scala.collection.JavaConverters._ - import GlobalIndex._ + import scala.collection.mutable val analyzer = new StandardAnalyzer() val writers = mutable.Map.empty[String, IndexWriter] @@ -39,11 +39,11 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { }) } - private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): Option[Document] = { + private def toDocument(vertex: S2VertexLike, forceToIndex: Boolean): Option[Document] = { val props = vertex.props.asScala - val filtered = props.filterKeys(globalIndex.propNamesSet) + val storeInGlobalIndex = if (forceToIndex) true else props.exists(_._2.columnMeta.storeInGlobalIndex) - if (filtered.isEmpty) None + if (!storeInGlobalIndex) None else { val doc = new Document() val id = vertex.id.toString @@ -52,24 +52,28 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES)) doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES)) - filtered.foreach { case (dim, s2VertexProperty) => - val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO - val field = s2VertexProperty.columnMeta.dataType match { - case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) - case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + props.foreach { case (dim, s2VertexProperty) => + val columnMeta = s2VertexProperty.columnMeta + if (columnMeta.seq > 0) { + val shouldIndex = if (columnMeta.storeInGlobalIndex) Field.Store.YES else Field.Store.NO + + val field = columnMeta.dataType match { + case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + } + doc.add(field) } - doc.add(field) } Option(doc) } } - private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = { + private def toDocument(edge: S2EdgeLike, forceToIndex: Boolean): Option[Document] = { val props = edge.getPropsWithTs().asScala - val filtered = props.filterKeys(globalIndex.propNamesSet) + val store = if (forceToIndex) true else props.exists(_._2.labelMeta.storeInGlobalIndex) - if (filtered.isEmpty) None + if (!store) None else { val doc = new Document() val id = edge.edgeId.toString @@ -78,8 +82,9 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES)) doc.add(new StringField(labelField, edge.label(), Field.Store.YES)) - filtered.foreach { case (dim, s2Property) => - val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO + props.foreach { case (dim, s2Property) => + val shouldIndex = if (s2Property.labelMeta.storeInGlobalIndex) Field.Store.YES else Field.Store.NO + val field = s2Property.labelMeta.dataType match { case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) @@ -91,39 +96,37 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { } } - override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType) + override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = + Future.successful(mutateEdges(edges, forceToIndex)) - globalIndexOptions.map { globalIndex => - val writer = getOrElseCreateIndexWriter(globalIndex.indexName) + override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = { + val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName) - vertices.foreach { vertex => - toDocument(globalIndex, vertex).foreach { doc => - writer.addDocument(doc) - } + vertices.foreach { vertex => + toDocument(vertex, forceToIndex).foreach { doc => + writer.addDocument(doc) } - - writer.commit() } + writer.commit() + vertices.map(_ => true) } - override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) + override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = + Future.successful(mutateVertices(vertices, forceToIndex)) - globalIndexOptions.map { globalIndex => - val writer = getOrElseCreateIndexWriter(globalIndex.indexName) + override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = { + val writer = getOrElseCreateIndexWriter(GlobalIndex.TableName) - edges.foreach { edge => - toDocument(globalIndex, edge).foreach { doc => - writer.addDocument(doc) - } + edges.foreach { edge => + toDocument(edge, forceToIndex).foreach { doc => + writer.addDocument(doc) } - - writer.commit() } + writer.commit() + edges.map(_ => true) } @@ -131,30 +134,28 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { val field = eidField val ids = new java.util.HashSet[EdgeId] - GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { globalIndex => - val queryString = buildQueryString(hasContainers) + val queryString = buildQueryString(hasContainers) - try { - val q = new QueryParser(field, analyzer).parse(queryString) + try { + val q = new QueryParser(field, analyzer).parse(queryString) - val reader = DirectoryReader.open(directories(globalIndex.indexName)) - val searcher = new IndexSearcher(reader) + val reader = DirectoryReader.open(directories(GlobalIndex.TableName)) + val searcher = new IndexSearcher(reader) - val docs = searcher.search(q, hitsPerPage) + val docs = searcher.search(q, hitsPerPage) - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get - ids.add(id); - } + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get + ids.add(id); + } - reader.close() + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) ids - } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids - } } new util.ArrayList[EdgeId](ids) @@ -163,31 +164,28 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { val field = vidField val ids = new java.util.HashSet[VertexId] + val queryString = buildQueryString(hasContainers) - GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { globalIndex => - val queryString = buildQueryString(hasContainers) + try { + val q = new QueryParser(field, analyzer).parse(queryString) - try { - val q = new QueryParser(field, analyzer).parse(queryString) + val reader = DirectoryReader.open(directories(GlobalIndex.TableName)) + val searcher = new IndexSearcher(reader) - val reader = DirectoryReader.open(directories(globalIndex.indexName)) - val searcher = new IndexSearcher(reader) + val docs = searcher.search(q, hitsPerPage) - val docs = searcher.search(q, hitsPerPage) - - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get - ids.add(id) - } + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get + ids.add(id) + } - reader.close() + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) ids - } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids - } } new util.ArrayList[VertexId](ids) @@ -200,9 +198,5 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) - - override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices)) - - override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala index f314083..974965f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala @@ -91,7 +91,8 @@ object Conversions { (JsPath \ "columnId").read[Int] and (JsPath \ "name").read[String] and (JsPath \ "seq").read[Byte] and - (JsPath \ "dataType").read[String] + (JsPath \ "dataType").read[String] and + (JsPath \ "storeGlobalIndex").read[Boolean] )(ColumnMeta.apply _) implicit val columnMetaWrites: Writes[ColumnMeta] = ( @@ -99,7 +100,8 @@ object Conversions { (JsPath \ "columnId").write[Int] and (JsPath \ "name").write[String] and (JsPath \ "seq").write[Byte] and - (JsPath \ "dataType").write[String] + (JsPath \ "dataType").write[String] and + (JsPath \ "storeGlobalIndex").write[Boolean] )(unlift(ColumnMeta.unapply)) /* Graph Class */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala index 14ad381..b764841 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -22,6 +22,8 @@ package org.apache.s2graph.core.mysqls import play.api.libs.json.Json import scalikejdbc._ +import scala.util.Try + object ColumnMeta extends Model[ColumnMeta] { val timeStampSeq = -1.toByte @@ -34,7 +36,8 @@ object ColumnMeta extends Model[ColumnMeta] { val reservedMetaNamesSet = reservedMetas.map(_.name).toSet def valueOf(rs: WrappedResultSet): ColumnMeta = { - ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase()) + ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), + rs.byte("seq"), rs.string("data_type").toLowerCase(), rs.boolean("store_in_global_index")) } def findById(id: Int)(implicit session: DBSession = AutoSession) = { @@ -69,21 +72,25 @@ object ColumnMeta extends Model[ColumnMeta] { } } - def insert(columnId: Int, name: String, dataType: String)(implicit session: DBSession = AutoSession) = { + def insert(columnId: Int, name: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { val ls = findAllByColumn(columnId, false) val seq = ls.size + 1 if (seq <= maxValue) { - sql"""insert into column_metas(column_id, name, seq, data_type) - select ${columnId}, ${name}, ${seq}, ${dataType}""" + sql"""insert into column_metas(column_id, name, seq, data_type, store_in_global_index) + select ${columnId}, ${name}, ${seq}, ${dataType}, ${storeInGlobalIndex}""" .updateAndReturnGeneratedKey.apply() } } - def findOrInsert(columnId: Int, name: String, dataType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = { + def findOrInsert(columnId: Int, + name: String, + dataType: String, + storeInGlobalIndex: Boolean = false, + useCache: Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = { findByName(columnId, name, useCache) match { case Some(c) => c case None => - insert(columnId, name, dataType) + insert(columnId, name, dataType, storeInGlobalIndex) expireCache(s"columnId=$columnId:name=$name") findByName(columnId, name).get } @@ -130,9 +137,20 @@ object ColumnMeta extends Model[ColumnMeta] { (cacheKey -> ls) }.toList) } + + def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { + sql""" + update column_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} + """.updateAndReturnGeneratedKey.apply() + } } -case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) { +case class ColumnMeta(id: Option[Int], + columnId: Int, + name: String, + seq: Byte, + dataType: String, + storeInGlobalIndex: Boolean = false) { lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) override def equals(other: Any): Boolean = { if (!other.isInstanceOf[ColumnMeta]) false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index 97fd704..2956f89 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -204,8 +204,8 @@ object Label extends Model[Label] { hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, compressionAlgorithm, options).toInt - val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType) => - val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType) + val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType, storeInGlobalIndex) => + val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType, storeInGlobalIndex) (propName -> labelMeta.seq) }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala index a16334a..920b432 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala @@ -28,6 +28,8 @@ import org.apache.s2graph.core.{GraphExceptions, JSONParser} import play.api.libs.json.Json import scalikejdbc._ +import scala.util.Try + object LabelMeta extends Model[LabelMeta] { /** dummy sequences */ @@ -78,7 +80,8 @@ object LabelMeta extends Model[LabelMeta] { val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp") def apply(rs: WrappedResultSet): LabelMeta = { - LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase) + LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), + rs.string("default_value"), rs.string("data_type").toLowerCase, rs.boolean("store_in_global_index")) } /** Note: DegreeSeq should not be included in serializer/deserializer. @@ -124,13 +127,13 @@ object LabelMeta extends Model[LabelMeta] { } } - def insert(labelId: Int, name: String, defaultValue: String, dataType: String)(implicit session: DBSession = AutoSession) = { + def insert(labelId: Int, name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { val ls = findAllByLabelId(labelId, useCache = false) val seq = ls.size + 1 if (seq < maxValue) { - sql"""insert into label_metas(label_id, name, seq, default_value, data_type) - select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply() + sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index) + select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply() } else { throw MaxPropSizeReachedException("max property size reached") } @@ -139,12 +142,13 @@ object LabelMeta extends Model[LabelMeta] { def findOrInsert(labelId: Int, name: String, defaultValue: String, - dataType: String)(implicit session: DBSession = AutoSession): LabelMeta = { + dataType: String, + storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession): LabelMeta = { findByName(labelId, name) match { case Some(c) => c case None => - insert(labelId, name, defaultValue, dataType) + insert(labelId, name, defaultValue, dataType, storeInGlobalIndex) val cacheKey = "labelId=" + labelId + ":name=" + name val cacheKeys = "labelId=" + labelId expireCache(cacheKey) @@ -184,6 +188,12 @@ object LabelMeta extends Model[LabelMeta] { cacheKey -> ls }.toList) } + + def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { + sql""" + update label_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} + """.updateAndReturnGeneratedKey.apply() + } } case class LabelMeta(id: Option[Int], @@ -191,7 +201,8 @@ case class LabelMeta(id: Option[Int], name: String, seq: Byte, defaultValue: String, - dataType: String) { + dataType: String, + storeInGlobalIndex: Boolean = false) { lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) override def equals(other: Any): Boolean = { if (!other.isInstanceOf[LabelMeta]) false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala index e745cb0..d7e9cea 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -123,4 +123,6 @@ case class Service(id: Option[Int], lazy val extraOptions = Model.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) + + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index 08f075d..820f93a 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -95,14 +95,14 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { val vertexPropsKeys = List("age" -> "integer", "im" -> "string") vertexPropsKeys.map { case (key, keyType) => - Management.addVertexProp(testServiceName, testColumnName, key, keyType) + Management.addVertexProp(testServiceName, testColumnName, key, keyType, storeInGlobalIndex = true) } // vertex type global index. - val globalVertexIndex = management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", Seq("age")) +// val globalVertexIndex = management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", Seq("age")) // edge type global index. - val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test_weight_time_edge", Seq("weight", "time")) +// val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test_weight_time_edge", Seq("weight", "time")) logger.info("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala index 679f94a..9ed12e6 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala @@ -43,7 +43,7 @@ class IndexProviderTest extends IntegrateCommon { val testColumn = ServiceColumn.find(testService.id.get, TestUtil.testColumnName).get val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L) val indexPropsColumnMeta = testColumn.metasInvMap("age") - + ColumnMeta.updateStoreInGlobalIndex(indexPropsColumnMeta.id.get, storeInGlobalIndex = true) val propsWithTs = Map( indexPropsColumnMeta -> InnerVal.withInt(1, "v4") @@ -85,8 +85,11 @@ class IndexProviderTest extends IntegrateCommon { val otherVertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L) val vertex = graph.elementBuilder.newVertex(vertexId) val otherVertex = graph.elementBuilder.newVertex(otherVertexId) - val weightMeta = testLabel.metaPropsInvMap("weight") - val timeMeta = testLabel.metaPropsInvMap("time") + val weightMeta = testLabel.metaPropsInvMap("weight").copy(storeInGlobalIndex = true) + val timeMeta = testLabel.metaPropsInvMap("time").copy(storeInGlobalIndex = true) + + LabelMeta.updateStoreInGlobalIndex(weightMeta.id.get, storeInGlobalIndex = true) + LabelMeta.updateStoreInGlobalIndex(timeMeta.id.get, storeInGlobalIndex = true) val propsWithTs = Map( weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"), @@ -104,7 +107,7 @@ class IndexProviderTest extends IntegrateCommon { println(s"[# of edges]: ${edges.size}") edges.foreach(e => println(s"[Edge]: $e")) - indexProvider.mutateEdges(edges) + indexProvider.mutateEdges(edges, forceToIndex = true) // match (0 until numOfTry).foreach { _ => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala deleted file mode 100644 index 1213ad0..0000000 --- a/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.models - -import org.apache.s2graph.core.TestCommonWithModels -import org.apache.s2graph.core.mysqls.GlobalIndex -import org.apache.tinkerpop.gremlin.process.traversal.P -import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import scala.collection.JavaConversions._ - -class GlobalIndexTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll { - override def beforeAll(): Unit = { - initTests() - } - - override def afterAll(): Unit = { - graph.shutdown() - } - - test("test buildGlobalIndex.") { - Seq(GlobalIndex.EdgeType, GlobalIndex.VertexType).foreach { elementType => - management.buildGlobalIndex(elementType, "test_global", Seq("weight", "date", "name")) - } - } - - test("findGlobalIndex.") { - // (weight: 34) AND (weight: [1 TO 100]) -// Seq(GlobalIndex.EdgeType, GlobalIndex.VertexType).foreach { elementType => -// val idx1 = management.buildGlobalIndex(elementType, "test-1", Seq("weight", "age", "name")) -// val idx2 = management.buildGlobalIndex(elementType, "test-2", Seq("gender", "age")) -// val idx3 = management.buildGlobalIndex(elementType, "test-3", Seq("class")) -// -// var hasContainers = Seq( -// new HasContainer("weight", P.eq(Int.box(34))), -// new HasContainer("age", P.between(Int.box(1), Int.box(100))) -// ) -// -// GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(Option(idx1)) -// -// hasContainers = Seq( -// new HasContainer("gender", P.eq(Int.box(34))), -// new HasContainer("age", P.eq(Int.box(34))), -// new HasContainer("class", P.eq(Int.box(34))) -// ) -// -// GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(Option(idx2)) -// -// hasContainers = Seq( -// new HasContainer("class", P.eq(Int.box(34))), -// new HasContainer("_", P.eq(Int.box(34))) -// ) -// -// GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(Option(idx3)) -// -// hasContainers = Seq( -// new HasContainer("key", P.eq(Int.box(34))), -// new HasContainer("value", P.eq(Int.box(34))) -// ) -// -// GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(None) -// } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index bf3291e..ca78917 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -143,7 +143,7 @@ class S2GraphProvider extends AbstractGraphProvider { } ColumnMeta.findByName(defaultServiceColumn.id.get, "aKey", useCache = false).foreach(cm => ColumnMeta.delete(cm.id.get)) - ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, useCache = false) + ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, storeInGlobalIndex = true, useCache = false) } if (loadGraphWith != null && loadGraphWith.value() == GraphData.MODERN) { mnt.createLabel("knows", defaultService.serviceName, "person", "integer", defaultService.serviceName, "person", "integer", @@ -454,9 +454,6 @@ class S2GraphProvider extends AbstractGraphProvider { options = Option("""{"skipReverse": false}""") ) - Seq(GlobalIndex.VertexType, GlobalIndex.EdgeType).foreach { elementType => - mnt.buildGlobalIndex(elementType, "global", allProps.map(_.name).toSeq) - } super.loadGraphData(graph, loadGraphWith, testClass, testName) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63012f8e/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala index 18a6c85..45060cc 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala @@ -41,7 +41,6 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { initTests() val g = new S2Graph(config) - lazy val gIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "S2GraphTest2", Seq("weight")) def printEdges(edges: Seq[Edge]): Unit = { edges.foreach { edge => logger.debug(s"[FetchedEdge]: $edge") @@ -417,7 +416,6 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { //// } // } ignore("Modern") { - gIndex val mnt = graph.management S2GraphFactory.cleanupDefaultSchema
