[S2GRAPH-172] Suggest to implement GraphQL as standard web interface for S2Graph.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1f1dee3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1f1dee3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1f1dee3d Branch: refs/heads/master Commit: 1f1dee3d153ef42582c043f6bc324724157485f6 Parents: 3cc98da Author: daewon <[email protected]> Authored: Wed Jan 17 18:57:04 2018 +0900 Committer: daewon <[email protected]> Committed: Tue Feb 13 18:08:37 2018 +0900 ---------------------------------------------------------------------- CHANGES | 2 + build.sbt | 8 +- dev_support/graph_mysql/schema.sql | 2 + project/Common.scala | 2 + project/plugins.sbt | 3 +- s2core/build.sbt | 5 +- .../org/apache/s2graph/core/mysqls/schema.sql | 2 + .../org/apache/s2graph/core/Management.scala | 79 +-- .../scala/org/apache/s2graph/core/S2Edge.scala | 21 +- .../apache/s2graph/core/S2GraphFactory.scala | 48 +- .../org/apache/s2graph/core/S2GraphLike.scala | 11 +- .../s2graph/core/index/ESIndexProvider.scala | 213 ++++++++ .../s2graph/core/index/IndexProvider.scala | 216 +------- .../core/index/LuceneIndexProvider.scala | 225 ++++++++ .../apache/s2graph/core/io/Conversions.scala | 6 +- .../apache/s2graph/core/mysqls/ColumnMeta.scala | 32 +- .../s2graph/core/mysqls/GlobalIndex.scala | 7 +- .../org/apache/s2graph/core/mysqls/Label.scala | 7 +- .../apache/s2graph/core/mysqls/LabelMeta.scala | 27 +- .../apache/s2graph/core/mysqls/Service.scala | 3 + .../apache/s2graph/core/types/VertexId.scala | 12 +- .../core/Integrate/IntegrateCommon.scala | 10 +- .../s2graph/core/index/IndexProviderTest.scala | 239 +++++---- .../s2graph/core/models/GlobalIndexTest.scala | 81 --- .../core/tinkerpop/S2GraphProvider.scala | 6 +- .../core/tinkerpop/structure/S2GraphTest.scala | 2 - s2graphql/README.md | 530 +++++++++++++++++++ s2graphql/build.sbt | 21 + s2graphql/project/build.properties | 18 + s2graphql/project/plugins.sbt | 1 + s2graphql/src/main/resources/application.conf | 7 + s2graphql/src/main/resources/log4j.properties | 26 + s2graphql/src/main/scala/GraphQLServer.scala | 110 ++++ s2graphql/src/main/scala/GraphRepository.scala | 164 ++++++ s2graphql/src/main/scala/HttpServer.scala | 61 +++ s2graphql/src/main/scala/S2Type.scala | 519 ++++++++++++++++++ .../main/scala/SangriaPlayJsonScalarType.scala | 76 +++ s2graphql/src/main/scala/SchemaDef.scala | 36 ++ s2graphql/src/test/resources/application.conf | 0 .../rest/play/controllers/AdminController.scala | 8 +- 40 files changed, 2342 insertions(+), 504 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0594ff1..8384bdc 100644 --- a/CHANGES +++ b/CHANGES @@ -54,6 +54,8 @@ Release Notes - S2Graph - Version 0.2.0 * [S2GRAPH-123] - Support different index on out/in direction. * [S2GRAPH-125] - Add options field on Label model for controlling advanced options. * [S2GRAPH-166] - Provide embeddable storage backend using RocksDB. + * [S2GRAPH-175] - Provide Elastic Search Index Provider. + * [S2GRAPH-172] - Suggest to implement GraphQL as standard web interface for S2Graph. ** Task * [S2GRAPH-162] - Update year in the NOTICE file. http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index b1017dd..cdabfbf 100755 --- a/build.sbt +++ b/build.sbt @@ -48,6 +48,10 @@ lazy val s2rest_netty = project .dependsOn(s2core) .settings(commonSettings: _*) +lazy val s2graphql = project + .dependsOn(s2core) + .settings(commonSettings: _*) + lazy val s2core = project.settings(commonSettings: _*) lazy val spark = project.settings(commonSettings: _*) @@ -66,7 +70,7 @@ lazy val s2graph_gremlin = project.dependsOn(s2core) lazy val root = (project in file(".")) .aggregate(s2core, s2rest_play) - .dependsOn(s2rest_play, s2rest_netty, loader, s2counter_loader) // this enables packaging on the root project + .dependsOn(s2rest_play, s2rest_netty, loader, s2counter_loader, s2graphql) // this enables packaging on the root project .settings(commonSettings: _*) lazy val runRatTask = inputKey[Unit]("Runs Apache rat on S2Graph") @@ -103,4 +107,4 @@ releaseProcess := Seq[ReleaseStep]( releasePublishArtifactsAction := publishSigned.value -mainClass in Compile := None \ No newline at end of file +mainClass in Compile := None http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/dev_support/graph_mysql/schema.sql ---------------------------------------------------------------------- diff --git a/dev_support/graph_mysql/schema.sql b/dev_support/graph_mysql/schema.sql index ea72c63..48b27df 100644 --- a/dev_support/graph_mysql/schema.sql +++ b/dev_support/graph_mysql/schema.sql @@ -76,6 +76,7 @@ CREATE TABLE `column_metas` ( `name` varchar(64) NOT NULL, `seq` tinyint NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'string', + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), INDEX `idx_column_id_seq` (`column_id`, `seq`) @@ -147,6 +148,7 @@ CREATE TABLE `label_metas` ( `default_value` varchar(64) NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'long', `used_in_index` tinyint NOT NULL DEFAULT 0, + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_label_id_name` (`label_id`, `name`), INDEX `idx_label_id_seq` (`label_id`, `seq`) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index b965aed..d3b8d93 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -28,6 +28,8 @@ object Common { val hadoopVersion = "2.7.3" val tinkerpopVersion = "3.2.5" + val elastic4sVersion = "6.1.0" + /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */ val loggingRuntime = Seq( "log4j" % "log4j" % "1.2.17", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/project/plugins.sbt ---------------------------------------------------------------------- diff --git a/project/plugins.sbt b/project/plugins.sbt index a6e5381..c5a079f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -25,13 +25,12 @@ addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.9") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.7.0") // sbt revolver -addSbtPlugin("io.spray" % "sbt-revolver" % "0.8.0") +addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.3") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.0") - addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 804c57c..110d5e5 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -50,7 +50,10 @@ libraryDependencies ++= Seq( "org.apache.lucene" % "lucene-core" % "6.6.0", "org.apache.lucene" % "lucene-queryparser" % "6.6.0", "org.rocksdb" % "rocksdbjni" % "5.8.0", - "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0" + "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0", + "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion excludeLogging(), + "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(), + "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging() ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql index f5f6a61..b66b46b 100644 --- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql +++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql @@ -65,6 +65,7 @@ CREATE TABLE `column_metas` ( `name` varchar(64) NOT NULL, `seq` tinyint NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'string', + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), INDEX `idx_column_id_seq` (`column_id`, `seq`) @@ -136,6 +137,7 @@ CREATE TABLE `label_metas` ( `default_value` varchar(64) NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'long', `used_in_index` tinyint NOT NULL DEFAULT 0, + `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_label_metas_label_id_name` (`label_id`, `name`), INDEX `idx_label_metas_label_id_seq` (`label_id`, `seq`) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 8d2d62a..c7d2d54 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -63,9 +63,9 @@ object Management { object JsonModel { - case class Prop(name: String, defaultValue: String, datatType: String) + case class Prop(name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false) - object Prop extends ((String, String, String) => Prop) + object Prop extends ((String, String, String, Boolean) => Prop) case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) } @@ -102,9 +102,10 @@ object Management { case Some(service) => val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) for { - Prop(propName, defaultValue, dataType) <- props + Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, useCache = false) + ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, + storeInGlobalIndex = storeInGlobalIndex, useCache = false) } } } @@ -165,7 +166,7 @@ object Management { val labelOpt = Label.findByName(labelStr) val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) - LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType) + LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.dataType, prop.storeInGlobalIndex) } } @@ -175,8 +176,8 @@ object Management { val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) props.map { - case Prop(propName, defaultValue, dataType) => - LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType) + case Prop(propName, defaultValue, dataType, storeInGlobalIndex) => + LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType, storeInGlobalIndex) } } } @@ -185,12 +186,13 @@ object Management { columnName: String, propsName: String, propsType: String, + storeInGlobalIndex: Boolean = false, schemaVersion: String = DEFAULT_VERSION): ColumnMeta = { val result = for { service <- Service.findByName(serviceName, useCache = false) serviceColumn <- ServiceColumn.find(service.id.get, columnName) } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType) + ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, storeInGlobalIndex) } result.getOrElse({ throw new RuntimeException(s"add property on vertex failed") @@ -361,9 +363,10 @@ class Management(graph: S2GraphLike) { case Some(service) => val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false) for { - Prop(propName, defaultValue, dataType) <- props + Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, useCache = false) + ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, + storeInGlobalIndex = storeInGlobalIndex, useCache = false) } serviceColumn } @@ -466,26 +469,42 @@ class Management(graph: S2GraphLike) { old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options) } - def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.VertexType, name, propNames) - - def buildGlobalVertexIndex(name: String, propNames: Seq[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.VertexType, name, propNames) - - def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) - - def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex = - buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) - - def buildGlobalIndex(elementType: String, name: String, propNames: Seq[String]): GlobalIndex = { - GlobalIndex.findBy(elementType, name, false) match { - case None => - GlobalIndex.insert(elementType, name, propNames) - GlobalIndex.findBy(elementType, name, false).get - case Some(oldIndex) => oldIndex - } - } + def enableVertexGlobalIndex(columnMeats: Seq[ColumnMeta]): Boolean = { + val successes = columnMeats.map { cm => + ColumnMeta.updateStoreInGlobalIndex(cm.id.get, cm.storeInGlobalIndex) + }.map(_.isSuccess) + + successes.forall(identity) + } + + def enableEdgeGlobalIndex(labelMetas: Seq[LabelMeta]): Boolean = { + val successes = labelMetas.map { lm => + LabelMeta.updateStoreInGlobalIndex(lm.id.get, lm.storeInGlobalIndex) + }.map(_.isSuccess) + + successes.forall(identity) + } + +// def buildGlobalVertexIndex(name: String, propNames: java.util.List[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.VertexType, name, propNames) +// +// def buildGlobalVertexIndex(name: String, propNames: Seq[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.VertexType, name, propNames) +// +// def buildGlobalEdgeIndex(name: String, propNames: java.util.List[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) +// +// def buildGlobalEdgeIndex(name: String, propNames: Seq[String]): GlobalIndex = +// buildGlobalIndex(GlobalIndex.EdgeType, name, propNames) +// +// def buildGlobalIndex(elementType: String, name: String, propNames: Seq[String]): GlobalIndex = { +// GlobalIndex.findBy(elementType, name, false) match { +// case None => +// GlobalIndex.insert(elementType, name, propNames) +// GlobalIndex.findBy(elementType, name, false).get +// case Some(oldIndex) => oldIndex +// } +// } def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for { label <- Try(Label.findByName(labelName, useCache = false).get) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 3eb1fa7..f52acf3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -377,20 +377,19 @@ case class S2Edge(override val innerGraph: S2GraphLike, object EdgeId { val EdgeIdDelimiter = "," def fromString(s: String): EdgeId = { -// val Array(src, tgt, labelName, dir, ts) = s.split(EdgeIdDelimiter) -// val label = Label.findByName(labelName).getOrElse(throw LabelNotExistException(labelName)) -// val srcColumn = label.srcColumnWithDir(GraphUtil.toDirection(dir)) -// val tgtColumn = label.tgtColumnWithDir(GraphUtil.toDirection(dir)) -// EdgeId( -// JSONParser.toInnerVal(src, srcColumn.columnType, label.schemaVersion), -// JSONParser.toInnerVal(tgt, tgtColumn.columnType, label.schemaVersion), -// labelName, -// dir, -// ts.toLong -// ) val js = Json.parse(s) s2EdgeIdReads.reads(Json.parse(s)).get } + + def isValid(edgeId: EdgeId): Option[EdgeId] = { + VertexId.isValid(edgeId.srcVertexId).flatMap { _ => + VertexId.isValid(edgeId.tgtVertexId).flatMap { _ => + Label.findByName(edgeId.labelName).map { _ => + edgeId + } + } + } + } } case class EdgeId(srcVertexId: VertexId, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala index a9c98c2..07a9be1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala @@ -69,30 +69,30 @@ object S2GraphFactory { val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false) val DefaultColumnMetas = { - ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", storeInGlobalIndex = true, useCache = false) } // Management.deleteLabel("_s2graph") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index bb36a33..f639e84 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -298,8 +298,15 @@ trait S2GraphLike extends Graph { belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { val vertex = elementBuilder.newVertex(id, ts, props, op, belongLabelIds) - val future = mutateVertices(Seq(vertex), withWait = true).map { rets => - if (rets.forall(_.isSuccess)) vertex + val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets => + if (rets.forall(_.isSuccess)) { + indexProvider.mutateVerticesAsync(Seq(vertex)).map { ls => + if (ls.forall(identity)) vertex + else { + throw new RuntimeException("indexVertex failed.") + } + } + } else throw new RuntimeException("addVertex failed.") } Await.ready(future, WaitTimeout) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala new file mode 100644 index 0000000..10c9222 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.index + +import java.util + +import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType} +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.HttpClient +import com.typesafe.config.Config +import org.apache.s2graph.core.io.Conversions +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer +import play.api.libs.json.Json + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.Try + +class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider { + import GlobalIndex._ + import IndexProvider._ + + import scala.collection.mutable + + implicit val executor = ec + + val esClientUri = Try(config.getString("es.index.provider.client.uri")).getOrElse("localhost") + val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200)) + + val WaitTime = Duration("60 seconds") + + private def toFields(vertex: S2VertexLike, forceToIndex: Boolean): Option[Map[String, Any]] = { + val props = vertex.props.asScala + val storeInGlobalIndex = if (forceToIndex) true else props.exists(_._2.columnMeta.storeInGlobalIndex) + + if (!storeInGlobalIndex) None + else { + val fields = mutable.Map.empty[String, Any] + + fields += (vidField -> vertex.id.toString()) + fields += (serviceField -> vertex.serviceName) + fields += (serviceColumnField -> vertex.columnName) + + props.foreach { case (dim, s2VertexProperty) => + // skip reserved fields. + if (s2VertexProperty.columnMeta.seq > 0) { + s2VertexProperty.columnMeta.dataType match { + case "string" => fields += (dim -> s2VertexProperty.innerVal.value.toString) + case _ => fields += (dim -> s2VertexProperty.innerVal.value) + } + } + } + + Option(fields.toMap) + } + } + + private def toFields(edge: S2EdgeLike, forceToIndex: Boolean): Option[Map[String, Any]] = { + val props = edge.getPropsWithTs().asScala + val store = if (forceToIndex) true else props.exists(_._2.labelMeta.storeInGlobalIndex) + + if (!store) None + else { + val fields = mutable.Map.empty[String, Any] + + fields += (eidField -> edge.edgeId.toString) + fields += (serviceField -> edge.serviceName) + fields += (labelField -> edge.label()) + + props.foreach { case (dim, s2Property) => + if (s2Property.labelMeta.seq > 0) { + s2Property.labelMeta.dataType match { + case "string" => fields += (dim -> s2Property.innerVal.value.toString) + case _ => fields += (dim -> s2Property.innerVal.value) + } + } + } + + Option(fields.toMap) + } + } + + override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { + val bulkRequests = vertices.flatMap { vertex => + toFields(vertex, forceToIndex).toSeq.map { fields => + update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields) + } + } + + if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true)) + else { + client.execute { + val requests = bulk(requests = bulkRequests) + + requests + }.map { ret => + ret match { + case Left(failure) => vertices.map(_ => false) + case Right(results) => vertices.map(_ => true) + } + } + } + } + + override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = + Await.result(mutateVerticesAsync(vertices, forceToIndex), WaitTime) + + override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = + Await.result(mutateEdgesAsync(edges, forceToIndex), WaitTime) + + override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = { + val bulkRequests = edges.flatMap { edge => + toFields(edge, forceToIndex).toSeq.map { fields => + update(edge.edgeId.toString()).in(new IndexAndType(GlobalIndex.EdgeIndexName, GlobalIndex.TypeName)).docAsUpsert(fields) + } + } + + if (bulkRequests.isEmpty) Future.successful(edges.map(_ => true)) + else { + client.execute { + bulk(bulkRequests) + }.map { ret => + ret match { + case Left(failure) => edges.map(_ => false) + case Right(results) => edges.map(_ => true) + } + } + } + } + + override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] = + Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime) + + override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = { + val field = eidField + val ids = new java.util.HashSet[EdgeId] + + val queryString = buildQueryString(hasContainers) + + client.execute { + search(GlobalIndex.EdgeIndexName).query(queryString) + }.map { ret => + ret match { + case Left(failure) => + case Right(results) => + results.result.hits.hits.foreach { searchHit => + searchHit.sourceAsMap.get(field).foreach { idValue => + val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get + + //TODO: Come up with better way to filter out hits with invalid meta. + EdgeId.isValid(id).foreach(ids.add) + } + } + } + + new util.ArrayList[EdgeId](ids) + } + } + + + override def fetchVertexIds(hasContainers: util.List[HasContainer]): util.List[VertexId] = + Await.result(fetchVertexIdsAsync(hasContainers), WaitTime) + + override def fetchVertexIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[VertexId]] = { + val field = vidField + val ids = new java.util.HashSet[VertexId] + + val queryString = buildQueryString(hasContainers) + + client.execute { + search(GlobalIndex.VertexIndexName).query(queryString) + }.map { ret => + ret match { + case Left(failure) => + case Right(results) => + results.result.hits.hits.foreach { searchHit => + searchHit.sourceAsMap.get(field).foreach { idValue => + val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get + //TODO: Come up with better way to filter out hits with invalid meta. + VertexId.isValid(id).foreach(ids.add) + } + } + } + + new util.ArrayList[VertexId](ids) + } + } + + override def shutdown(): Unit = { + client.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index 2411e65..ae632df 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -22,35 +22,30 @@ package org.apache.s2graph.core.index import java.util import com.typesafe.config.Config -import org.apache.lucene.analysis.standard.StandardAnalyzer -import org.apache.lucene.document._ -import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig} -import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} -import org.apache.lucene.search.IndexSearcher -import org.apache.lucene.store.{BaseDirectory, RAMDirectory} -import org.apache.s2graph.core.io.Conversions import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls._ -import org.apache.s2graph.core.types.{InnerValLike, VertexId} -import org.apache.s2graph.core.utils.logger -import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains, P} +import org.apache.s2graph.core.types.VertexId import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP} +import org.apache.tinkerpop.gremlin.process.traversal.{Compare, Contains} import org.apache.tinkerpop.gremlin.structure.T -import play.api.libs.json.Json -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try object IndexProvider { import GlobalIndex._ + //TODO: Fix Me val hitsPerPage = 100000 + val IdField = "id" - def apply(config: Config): IndexProvider = { - val indexProviderType = "lucene" -// if (config.hasPath("index.provider")) config.getString("index.provider") else "lucene" + def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = { + + val indexProviderType = Try { config.getString("index.provider") }.getOrElse("lucene") indexProviderType match { case "lucene" => new LuceneIndexProvider(config) + case "es" => new ESIndexProvider(config) } } @@ -127,194 +122,11 @@ trait IndexProvider { def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]] - def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] - def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] + def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] + def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] - def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] - def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] + def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] + def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] def shutdown(): Unit } - -class LuceneIndexProvider(config: Config) extends IndexProvider { - import IndexProvider._ - import scala.collection.mutable - import scala.collection.JavaConverters._ - import GlobalIndex._ - - val analyzer = new StandardAnalyzer() - val writers = mutable.Map.empty[String, IndexWriter] - val directories = mutable.Map.empty[String, BaseDirectory] - - private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = { - writers.getOrElseUpdate(indexName, { - val dir = directories.getOrElseUpdate(indexName, new RAMDirectory()) - val indexConfig = new IndexWriterConfig(analyzer) - new IndexWriter(dir, indexConfig) - }) - } - - private def toDocument(globalIndex: GlobalIndex, vertex: S2VertexLike): Option[Document] = { - val props = vertex.props.asScala - val exist = props.exists(t => globalIndex.propNamesSet(t._1)) - if (!exist) None - else { - val doc = new Document() - val id = vertex.id.toString - - doc.add(new StringField(vidField, id, Field.Store.YES)) - doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES)) - doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES)) - - props.foreach { case (dim, s2VertexProperty) => - val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO - val field = s2VertexProperty.columnMeta.dataType match { - case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) - case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) - } - doc.add(field) - } - - Option(doc) - } - } - - private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = { - val props = edge.getPropsWithTs().asScala - val exist = props.exists(t => globalIndex.propNamesSet(t._1)) - if (!exist) None - else { - val doc = new Document() - val id = edge.edgeId.toString - - doc.add(new StringField(eidField, id, Field.Store.YES)) - doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES)) - doc.add(new StringField(labelField, edge.label(), Field.Store.YES)) - - props.foreach { case (dim, s2Property) => - val shouldIndex = if (globalIndex.propNamesSet(dim)) Field.Store.YES else Field.Store.NO - val field = s2Property.labelMeta.dataType match { - case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) - case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) - } - doc.add(field) - } - - Option(doc) - } - } - - override def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.VertexType) - - globalIndexOptions.map { globalIndex => - val writer = getOrElseCreateIndexWriter(globalIndex.indexName) - - vertices.foreach { vertex => - toDocument(globalIndex, vertex).foreach { doc => - writer.addDocument(doc) - } - } - - writer.commit() - } - - vertices.map(_ => true) - } - - override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = { - val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType) - - globalIndexOptions.map { globalIndex => - val writer = getOrElseCreateIndexWriter(globalIndex.indexName) - - edges.foreach { edge => - toDocument(globalIndex, edge).foreach { doc => - writer.addDocument(doc) - } - } - - writer.commit() - } - - edges.map(_ => true) - } - - override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { - val field = eidField - val ids = new java.util.HashSet[EdgeId] - - GlobalIndex.findGlobalIndex(GlobalIndex.EdgeType, hasContainers).map { globalIndex => - val queryString = buildQueryString(hasContainers) - - try { - val q = new QueryParser(field, analyzer).parse(queryString) - - val reader = DirectoryReader.open(directories(globalIndex.indexName)) - val searcher = new IndexSearcher(reader) - - val docs = searcher.search(q, hitsPerPage) - - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get - ids.add(id); - } - - reader.close() - ids - } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids - } - } - - new util.ArrayList[EdgeId](ids) - } - - override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { - val field = vidField - val ids = new java.util.HashSet[VertexId] - - GlobalIndex.findGlobalIndex(GlobalIndex.VertexType, hasContainers).map { globalIndex => - val queryString = buildQueryString(hasContainers) - - try { - val q = new QueryParser(field, analyzer).parse(queryString) - - val reader = DirectoryReader.open(directories(globalIndex.indexName)) - val searcher = new IndexSearcher(reader) - - val docs = searcher.search(q, hitsPerPage) - - docs.scoreDocs.foreach { scoreDoc => - val document = searcher.doc(scoreDoc.doc) - val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get - ids.add(id) - } - - reader.close() - ids - } catch { - case ex: ParseException => - logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) - ids - } - } - - new util.ArrayList[VertexId](ids) - } - - override def shutdown(): Unit = { - writers.foreach { case (_, writer) => writer.close() } - } - - override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) - - override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) - - override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices)) - - override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges)) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala new file mode 100644 index 0000000..841331d --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.index + +import java.util + +import com.typesafe.config.Config +import org.apache.lucene.analysis.standard.StandardAnalyzer +import org.apache.lucene.document.{Document, Field, StringField} +import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term} +import org.apache.lucene.queryparser.classic.{ParseException, QueryParser} +import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.store.{BaseDirectory, RAMDirectory} +import org.apache.s2graph.core.io.Conversions +import org.apache.s2graph.core.mysqls.GlobalIndex +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike} +import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer +import play.api.libs.json.Json + +import scala.concurrent.Future + + +class LuceneIndexProvider(config: Config) extends IndexProvider { + import GlobalIndex._ + import IndexProvider._ + + import scala.collection.JavaConverters._ + import scala.collection.mutable + + val analyzer = new StandardAnalyzer() + val writers = mutable.Map.empty[String, IndexWriter] + val directories = mutable.Map.empty[String, BaseDirectory] + + private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = { + writers.getOrElseUpdate(indexName, { + val dir = directories.getOrElseUpdate(indexName, new RAMDirectory()) + val indexConfig = new IndexWriterConfig(analyzer) + new IndexWriter(dir, indexConfig) + }) + } + + private def toDocument(vertex: S2VertexLike, forceToIndex: Boolean): Option[Document] = { + val props = vertex.props.asScala + val storeInGlobalIndex = if (forceToIndex) true else props.exists(_._2.columnMeta.storeInGlobalIndex) + + if (!storeInGlobalIndex) None + else { + val doc = new Document() + val id = vertex.id.toString + + doc.add(new StringField(vidField, id, Field.Store.YES)) + doc.add(new StringField(serviceField, vertex.serviceName, Field.Store.YES)) + doc.add(new StringField(serviceColumnField, vertex.columnName, Field.Store.YES)) + + props.foreach { case (dim, s2VertexProperty) => + val columnMeta = s2VertexProperty.columnMeta + if (columnMeta.seq > 0) { + val shouldIndex = if (columnMeta.storeInGlobalIndex) Field.Store.YES else Field.Store.NO + + val field = columnMeta.dataType match { + case "string" => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2VertexProperty.innerVal.value.toString, shouldIndex) + } + doc.add(field) + } + } + + Option(doc) + } + } + + private def toDocument(edge: S2EdgeLike, forceToIndex: Boolean): Option[Document] = { + val props = edge.getPropsWithTs().asScala + val store = if (forceToIndex) true else props.exists(_._2.labelMeta.storeInGlobalIndex) + + if (!store) None + else { + val doc = new Document() + val id = edge.edgeId.toString + + doc.add(new StringField(eidField, id, Field.Store.YES)) + doc.add(new StringField(serviceField, edge.serviceName, Field.Store.YES)) + doc.add(new StringField(labelField, edge.label(), Field.Store.YES)) + + props.foreach { case (dim, s2Property) => + val shouldIndex = if (s2Property.labelMeta.storeInGlobalIndex) Field.Store.YES else Field.Store.NO + + val field = s2Property.labelMeta.dataType match { + case "string" => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) + case _ => new StringField(dim, s2Property.innerVal.value.toString, shouldIndex) + } + doc.add(field) + } + + Option(doc) + } + } + + override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = + Future.successful(mutateEdges(edges, forceToIndex)) + + override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = { + val writer = getOrElseCreateIndexWriter(GlobalIndex.VertexIndexName) + + vertices.foreach { vertex => + toDocument(vertex, forceToIndex).foreach { doc => + val vId = vertex.id.toString() + + writer.updateDocument(new Term(IdField, vId), doc) + } + } + + writer.commit() + + vertices.map(_ => true) + } + + override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = + Future.successful(mutateVertices(vertices, forceToIndex)) + + override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = { + val writer = getOrElseCreateIndexWriter(GlobalIndex.EdgeIndexName) + + edges.foreach { edge => + toDocument(edge, forceToIndex).foreach { doc => + val eId = edge.edgeId.toString + + writer.updateDocument(new Term(IdField, eId), doc) + } + } + + writer.commit() + + edges.map(_ => true) + } + + override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = { + val field = eidField + val ids = new java.util.HashSet[EdgeId] + + val queryString = buildQueryString(hasContainers) + + try { + val q = new QueryParser(field, analyzer).parse(queryString) + + val reader = DirectoryReader.open(directories(GlobalIndex.EdgeIndexName)) + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get + ids.add(id); + } + + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + ids + } + + new util.ArrayList[EdgeId](ids) + } + + override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = { + val field = vidField + val ids = new java.util.HashSet[VertexId] + val queryString = buildQueryString(hasContainers) + + try { + val q = new QueryParser(field, analyzer).parse(queryString) + + val reader = DirectoryReader.open(directories(GlobalIndex.VertexIndexName)) + val searcher = new IndexSearcher(reader) + + val docs = searcher.search(q, hitsPerPage) + + docs.scoreDocs.foreach { scoreDoc => + val document = searcher.doc(scoreDoc.doc) + val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get + ids.add(id) + } + + reader.close() + ids + } catch { + case ex: ParseException => + logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex) + ids + } + + new util.ArrayList[VertexId](ids) + } + + override def shutdown(): Unit = { + writers.foreach { case (_, writer) => writer.close() } + } + + override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers)) + + override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers)) +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala index f314083..974965f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala @@ -91,7 +91,8 @@ object Conversions { (JsPath \ "columnId").read[Int] and (JsPath \ "name").read[String] and (JsPath \ "seq").read[Byte] and - (JsPath \ "dataType").read[String] + (JsPath \ "dataType").read[String] and + (JsPath \ "storeGlobalIndex").read[Boolean] )(ColumnMeta.apply _) implicit val columnMetaWrites: Writes[ColumnMeta] = ( @@ -99,7 +100,8 @@ object Conversions { (JsPath \ "columnId").write[Int] and (JsPath \ "name").write[String] and (JsPath \ "seq").write[Byte] and - (JsPath \ "dataType").write[String] + (JsPath \ "dataType").write[String] and + (JsPath \ "storeGlobalIndex").write[Boolean] )(unlift(ColumnMeta.unapply)) /* Graph Class */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala index 14ad381..b764841 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -22,6 +22,8 @@ package org.apache.s2graph.core.mysqls import play.api.libs.json.Json import scalikejdbc._ +import scala.util.Try + object ColumnMeta extends Model[ColumnMeta] { val timeStampSeq = -1.toByte @@ -34,7 +36,8 @@ object ColumnMeta extends Model[ColumnMeta] { val reservedMetaNamesSet = reservedMetas.map(_.name).toSet def valueOf(rs: WrappedResultSet): ColumnMeta = { - ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase()) + ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), + rs.byte("seq"), rs.string("data_type").toLowerCase(), rs.boolean("store_in_global_index")) } def findById(id: Int)(implicit session: DBSession = AutoSession) = { @@ -69,21 +72,25 @@ object ColumnMeta extends Model[ColumnMeta] { } } - def insert(columnId: Int, name: String, dataType: String)(implicit session: DBSession = AutoSession) = { + def insert(columnId: Int, name: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { val ls = findAllByColumn(columnId, false) val seq = ls.size + 1 if (seq <= maxValue) { - sql"""insert into column_metas(column_id, name, seq, data_type) - select ${columnId}, ${name}, ${seq}, ${dataType}""" + sql"""insert into column_metas(column_id, name, seq, data_type, store_in_global_index) + select ${columnId}, ${name}, ${seq}, ${dataType}, ${storeInGlobalIndex}""" .updateAndReturnGeneratedKey.apply() } } - def findOrInsert(columnId: Int, name: String, dataType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = { + def findOrInsert(columnId: Int, + name: String, + dataType: String, + storeInGlobalIndex: Boolean = false, + useCache: Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = { findByName(columnId, name, useCache) match { case Some(c) => c case None => - insert(columnId, name, dataType) + insert(columnId, name, dataType, storeInGlobalIndex) expireCache(s"columnId=$columnId:name=$name") findByName(columnId, name).get } @@ -130,9 +137,20 @@ object ColumnMeta extends Model[ColumnMeta] { (cacheKey -> ls) }.toList) } + + def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { + sql""" + update column_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} + """.updateAndReturnGeneratedKey.apply() + } } -case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) { +case class ColumnMeta(id: Option[Int], + columnId: Int, + name: String, + seq: Byte, + dataType: String, + storeInGlobalIndex: Boolean = false) { lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) override def equals(other: Any): Boolean = { if (!other.isInstanceOf[ColumnMeta]) false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala index a918db5..501a964 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala @@ -33,7 +33,10 @@ object GlobalIndex extends Model[GlobalIndex] { val VertexType = "vertex" val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField) - val TableName = "global_indices" +// val IndexName = "global_indices" + val VertexIndexName = "global_vertex_index" + val EdgeIndexName = "global_edge_index" + val TypeName = "test" def apply(rs: WrappedResultSet): GlobalIndex = { GlobalIndex(rs.intOpt("id"), @@ -91,5 +94,7 @@ case class GlobalIndex(id: Option[Int], elementType: String, propNames: Seq[String], indexName: String) { + val backendIndexName = indexName + "_" + elementType + val backendIndexNameWithType = backendIndexName + "/test1" lazy val propNamesSet = propNames.toSet } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index 97fd704..c128163 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -204,8 +204,8 @@ object Label extends Model[Label] { hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, compressionAlgorithm, options).toInt - val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType) => - val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType) + val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType, storeInGlobalIndex) => + val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType, storeInGlobalIndex) (propName -> labelMeta.seq) }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap @@ -231,14 +231,17 @@ object Label extends Model[Label] { def findAll()(implicit session: DBSession = AutoSession) = { val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply() + putsToCache(ls.map { x => val cacheKey = s"id=${x.id.get}" (cacheKey -> x) }) + putsToCache(ls.map { x => val cacheKey = s"label=${x.label}" (cacheKey -> x) }) + ls } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala index a16334a..c715c47 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala @@ -28,6 +28,8 @@ import org.apache.s2graph.core.{GraphExceptions, JSONParser} import play.api.libs.json.Json import scalikejdbc._ +import scala.util.Try + object LabelMeta extends Model[LabelMeta] { /** dummy sequences */ @@ -78,7 +80,8 @@ object LabelMeta extends Model[LabelMeta] { val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp") def apply(rs: WrappedResultSet): LabelMeta = { - LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase) + LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), + rs.string("default_value"), rs.string("data_type").toLowerCase, rs.boolean("store_in_global_index")) } /** Note: DegreeSeq should not be included in serializer/deserializer. @@ -124,13 +127,13 @@ object LabelMeta extends Model[LabelMeta] { } } - def insert(labelId: Int, name: String, defaultValue: String, dataType: String)(implicit session: DBSession = AutoSession) = { + def insert(labelId: Int, name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { val ls = findAllByLabelId(labelId, useCache = false) val seq = ls.size + 1 if (seq < maxValue) { - sql"""insert into label_metas(label_id, name, seq, default_value, data_type) - select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply() + sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index) + select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply() } else { throw MaxPropSizeReachedException("max property size reached") } @@ -139,12 +142,13 @@ object LabelMeta extends Model[LabelMeta] { def findOrInsert(labelId: Int, name: String, defaultValue: String, - dataType: String)(implicit session: DBSession = AutoSession): LabelMeta = { + dataType: String, + storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession): LabelMeta = { findByName(labelId, name) match { case Some(c) => c case None => - insert(labelId, name, defaultValue, dataType) + insert(labelId, name, defaultValue, dataType, storeInGlobalIndex) val cacheKey = "labelId=" + labelId + ":name=" + name val cacheKeys = "labelId=" + labelId expireCache(cacheKey) @@ -183,6 +187,14 @@ object LabelMeta extends Model[LabelMeta] { val cacheKey = s"labelId=${labelId}" cacheKey -> ls }.toList) + + ls + } + + def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try { + sql""" + update label_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id} + """.updateAndReturnGeneratedKey.apply() } } @@ -191,7 +203,8 @@ case class LabelMeta(id: Option[Int], name: String, seq: Byte, defaultValue: String, - dataType: String) { + dataType: String, + storeInGlobalIndex: Boolean = false) { lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) override def equals(other: Any): Boolean = { if (!other.isInstanceOf[LabelMeta]) false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala index e745cb0..286aa37 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -95,6 +95,7 @@ object Service extends Model[Service] { val cacheKey = s"serviceName=${x.serviceName}" (cacheKey -> x) }) + ls } @@ -123,4 +124,6 @@ case class Service(id: Option[Int], lazy val extraOptions = Model.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) + + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala index eb00405..b1de0c4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala @@ -59,14 +59,14 @@ object VertexId extends HBaseDeserializable { } def fromString(s: String): VertexId = { - -// val Array(serviceId, columnName, innerValStr) = s.split(S2Vertex.VertexLabelDelimiter) -// val service = Service.findById(serviceId.toInt) -// val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new LabelNotExistException(columnName)) -// val innerId = JSONParser.toInnerVal(innerValStr, column.columnType, column.schemaVersion) -// VertexId(column, innerId) s2VertexIdReads.reads(Json.parse(s)).get } + + def isValid(vertexId: VertexId): Option[VertexId] = { + ServiceColumn.find(vertexId.column.serviceId, vertexId.column.columnName).map { column => + vertexId + } + } } class VertexId (val column: ServiceColumn, val innerId: InnerValLike) extends HBaseSerializable with Comparable[VertexId] { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index c720b9f..820f93a 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.Integrate import com.typesafe.config._ -import org.apache.s2graph.core.mysqls.{Label, Model} +import org.apache.s2graph.core.mysqls.{GlobalIndex, Label, Model} import org.apache.s2graph.core.rest.{RequestParser, RestHandler} import org.apache.s2graph.core.utils.logger import org.apache.s2graph.core._ @@ -95,9 +95,15 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { val vertexPropsKeys = List("age" -> "integer", "im" -> "string") vertexPropsKeys.map { case (key, keyType) => - Management.addVertexProp(testServiceName, testColumnName, key, keyType) + Management.addVertexProp(testServiceName, testColumnName, key, keyType, storeInGlobalIndex = true) } + // vertex type global index. +// val globalVertexIndex = management.buildGlobalIndex(GlobalIndex.VertexType, "test_age_index", Seq("age")) + + // edge type global index. +// val globalEdgeIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test_weight_time_edge", Seq("weight", "time")) + logger.info("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala index 8294a8f..4889412 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.index import org.apache.s2graph.core.Integrate.IntegrateCommon -import org.apache.s2graph.core.{Query, QueryParam, S2Vertex, Step} +import org.apache.s2graph.core._ import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.apache.tinkerpop.gremlin.process.traversal.{Order, P} import org.apache.s2graph.core.mysqls._ @@ -31,120 +31,135 @@ import org.apache.tinkerpop.gremlin.structure.T import scala.collection.JavaConversions._ class IndexProviderTest extends IntegrateCommon { + import scala.concurrent.ExecutionContext.Implicits.global val indexProvider = IndexProvider(config) val numOfTry = 1 - lazy val gIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "test1", Seq("_timestamp", "weight", "time")) - -// test("test vertex write/query") { -// gIndex -// import TestUtil._ -//// Management.addVertexProp(testServiceName, testColumnName, "time", "long") -// -// val testService = Service.findByName(TestUtil.testServiceName).get -// val testColumn = ServiceColumn.find(testService.id.get, TestUtil.testColumnName).get -// val vertexId = graph.newVertexId(testServiceName)(testColumnName)(1L) -// -// val propsWithTs = Map( -//// testColumn.metasInvMap("time") -> InnerVal.withLong(1L, "v4") -// ColumnMeta.timestamp -> InnerVal.withLong(1L, "v4") -// ) -// val otherPropsWithTs = Map( -//// testColumn.metasInvMap("time") -> InnerVal.withLong(2L, "v4") -// ColumnMeta.timestamp -> InnerVal.withLong(2L, "v4") -// ) -// val vertex = graph.newVertex(vertexId) -// S2Vertex.fillPropsWithTs(vertex, propsWithTs) -// -// val otherVertex = graph.newVertex(vertexId) -// S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs) -// -// val numOfOthers = 10 -// val vertices = Seq(vertex) ++ (0 until numOfOthers).map(_ => otherVertex) -// -// println(s"[# of vertices]: ${vertices.size}") -// vertices.foreach(v => println(s"[Vertex]: $v")) -// indexProvider.mutateVertices(vertices) -// -// (0 until numOfTry).foreach { ith => -// val hasContainer = new HasContainer("_timestamp", P.eq(Long.box(1))) -// -// var ids = indexProvider.fetchVertexIds(Seq(hasContainer)) -// ids.head shouldBe vertex.id -// -// ids.foreach { id => -// println(s"[Id]: $id") -// } -// } -// } -// -// test("test edge write/query ") { -// import TestUtil._ -// val testLabelName = TestUtil.testLabelName -// val testLabel = Label.findByName(testLabelName).getOrElse(throw new IllegalArgumentException) -// val vertexId = graph.newVertexId(testServiceName)(testColumnName)(1L) -// val otherVertexId = graph.newVertexId(testServiceName)(testColumnName)(2L) -// val vertex = graph.newVertex(vertexId) -// val otherVertex = graph.newVertex(otherVertexId) -// -// val propsWithTs = Map( -// LabelMeta.timestamp -> InnerValLikeWithTs.withLong(1L, 1L, "v4"), -// testLabel.metaPropsInvMap("time") -> InnerValLikeWithTs.withLong(10L, 1L, "v4") -// ) -// val otherPropsWithTs = Map( -// LabelMeta.timestamp -> InnerValLikeWithTs.withLong(2L, 2L, "v4"), -// testLabel.metaPropsInvMap("time") -> InnerValLikeWithTs.withLong(20L, 2L, "v4") -// ) -// val edge = graph.newEdge(vertex, vertex, testLabel, 0, propsWithTs = propsWithTs) -// val otherEdge = graph.newEdge(otherVertex, otherVertex, testLabel, 0, propsWithTs = otherPropsWithTs) -// val numOfOthers = 10 -// val edges = Seq(edge) ++ (0 until numOfOthers).map(_ => otherEdge) -// -// println(s"[# of edges]: ${edges.size}") -// edges.foreach(e => println(s"[Edge]: $e")) -// indexProvider.mutateEdges(edges) -// -// // match -// (0 until numOfTry).foreach { _ => -// val hasContainers = Seq(new HasContainer("time", P.eq(Int.box(10))), -// new HasContainer("_timestamp", P.eq(Int.box(1)))) -// val ids = indexProvider.fetchEdgeIds(hasContainers) -// ids.head shouldBe edge.edgeId -// -// ids.foreach { id => -// println(s"[Id]: $id") -// } -// } -// -// // match and not -// (0 until numOfTry).foreach { _ => -// val hasContainers = Seq(new HasContainer("time", P.eq(Int.box(20))), -// new HasContainer("_timestamp", P.neq(Int.box(1)))) -// val ids = indexProvider.fetchEdgeIds(hasContainers) -// // ids.size shouldBe 0 -// // distinct make ids size to 1 -//// ids.size shouldBe numOfOthers -// -// ids.foreach { id => -// id shouldBe otherEdge.edgeId -// println(s"[Id]: $id") -// } -// } -// -// // range -// (0 until numOfTry).foreach { _ => -// val hasContainers = Seq(new HasContainer("time", -// P.inside(Int.box(0), Int.box(11)))) -// val ids = indexProvider.fetchEdgeIds(hasContainers) -// // ids.size shouldBe 0 -// ids.size shouldBe 1 -// -// ids.foreach { id => -// id shouldBe edge.edgeId -// println(s"[Id]: $id") -// } -// } -// } + + test("test vertex write/query") { + import TestUtil._ + + val testService = Service.findByName(TestUtil.testServiceName).get + val testColumn = ServiceColumn.find(testService.id.get, TestUtil.testColumnName).get + val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L) + val indexPropsColumnMeta = testColumn.metasInvMap("age") + + val propsWithTs = Map( + indexPropsColumnMeta -> InnerVal.withInt(1, "v4") + ) + val otherPropsWithTs = Map( + indexPropsColumnMeta -> InnerVal.withInt(2, "v4") + ) + val vertex = graph.elementBuilder.newVertex(vertexId) + S2Vertex.fillPropsWithTs(vertex, propsWithTs) + + val otherVertex = graph.elementBuilder.newVertex(vertexId) + S2Vertex.fillPropsWithTs(otherVertex, otherPropsWithTs) + + val numOfOthers = 20 + val vertices = Seq(vertex) ++ (10 until numOfOthers).map { ith => + val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(ith) + + val v = graph.elementBuilder.newVertex(vertexId) + S2Vertex.fillPropsWithTs(v, otherPropsWithTs) + + v + } + + println(s"[# of vertices]: ${vertices.size}") + vertices.foreach(v => println(s"[Vertex]: $v, ${v.props}")) + indexProvider.mutateVertices(vertices, forceToIndex = true) + + // enough time for elastic search to persist docs. + Thread.sleep(1000) + + (0 until numOfTry).foreach { ith => + val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1))) + + var ids = indexProvider.fetchVertexIds(Seq(hasContainer)) + ids.head shouldBe vertex.id + + ids.foreach { id => + println(s"[Id]: $id") + } + } + } + + test("test edge write/query ") { + import TestUtil._ + val testLabelName = TestUtil.testLabelName + val testLabel = Label.findByName(testLabelName).getOrElse(throw new IllegalArgumentException) + + val vertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(1L) + val otherVertexId = graph.elementBuilder.newVertexId(testServiceName)(testColumnName)(2L) + val vertex = graph.elementBuilder.newVertex(vertexId) + val otherVertex = graph.elementBuilder.newVertex(otherVertexId) + val weightMeta = testLabel.metaPropsInvMap("weight") + val timeMeta = testLabel.metaPropsInvMap("time") + + val propsWithTs = Map( + weightMeta -> InnerValLikeWithTs.withLong(1L, 1L, "v4"), + timeMeta -> InnerValLikeWithTs.withLong(10L, 1L, "v4") + ) + val otherPropsWithTs = Map( + weightMeta -> InnerValLikeWithTs.withLong(2L, 2L, "v4"), + timeMeta -> InnerValLikeWithTs.withLong(20L, 2L, "v4") + ) + + val edge = graph.elementBuilder.newEdge(vertex, vertex, testLabel, 0, propsWithTs = propsWithTs) + val otherEdge = graph.elementBuilder.newEdge(otherVertex, otherVertex, testLabel, 0, propsWithTs = otherPropsWithTs) + val numOfOthers = 10 + val edges = Seq(edge) ++ (0 until numOfOthers).map(_ => otherEdge) + + println(s"[# of edges]: ${edges.size}") + edges.foreach(e => println(s"[Edge]: $e")) + indexProvider.mutateEdges(edges, forceToIndex = true) + + // enough time for elastic search to persist docs. + Thread.sleep(1000) + + // match + (0 until numOfTry).foreach { _ => + val hasContainers = Seq(new HasContainer(timeMeta.name, P.eq(Int.box(10))), + new HasContainer(weightMeta.name, P.eq(Int.box(1)))) + + val ids = indexProvider.fetchEdgeIds(hasContainers) + ids.head shouldBe edge.edgeId + + ids.foreach { id => + println(s"[Id]: $id") + } + } + + // match and not + (0 until numOfTry).foreach { _ => + val hasContainers = Seq(new HasContainer(timeMeta.name, P.eq(Int.box(20))), + new HasContainer(weightMeta.name, P.neq(Int.box(1)))) + val ids = indexProvider.fetchEdgeIds(hasContainers) + // ids.size shouldBe 0 + // distinct make ids size to 1 +// ids.size shouldBe numOfOthers + + ids.foreach { id => + id shouldBe otherEdge.edgeId + println(s"[Id]: $id") + } + } + + // range + (0 until numOfTry).foreach { _ => + val hasContainers = Seq(new HasContainer(timeMeta.name, + P.inside(Int.box(0), Int.box(11)))) + val ids = indexProvider.fetchEdgeIds(hasContainers) + // ids.size shouldBe 0 + ids.size shouldBe 1 + + ids.foreach { id => + id shouldBe edge.edgeId + println(s"[Id]: $id") + } + } + } test("buildQuerySingleString") { // (weight: 34) AND (weight: [0.5 TO *] AND price: 30) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala deleted file mode 100644 index a732564..0000000 --- a/s2core/src/test/scala/org/apache/s2graph/core/models/GlobalIndexTest.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.models - -import org.apache.s2graph.core.TestCommonWithModels -import org.apache.s2graph.core.mysqls.GlobalIndex -import org.apache.tinkerpop.gremlin.process.traversal.P -import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import scala.collection.JavaConversions._ - -class GlobalIndexTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll { - override def beforeAll(): Unit = { - initTests() - } - - override def afterAll(): Unit = { - graph.shutdown() - } - - test("test buildGlobalIndex.") { - Seq(GlobalIndex.EdgeType, GlobalIndex.VertexType).foreach { elementType => - management.buildGlobalIndex(elementType, "test_global", Seq("weight", "date", "name")) - } - } - - test("findGlobalIndex.") { - // (weight: 34) AND (weight: [1 TO 100]) - Seq(GlobalIndex.EdgeType, GlobalIndex.VertexType).foreach { elementType => - val idx1 = management.buildGlobalIndex(elementType, "test-1", Seq("weight", "age", "name")) - val idx2 = management.buildGlobalIndex(elementType, "test-2", Seq("gender", "age")) - val idx3 = management.buildGlobalIndex(elementType, "test-3", Seq("class")) - - var hasContainers = Seq( - new HasContainer("weight", P.eq(Int.box(34))), - new HasContainer("age", P.between(Int.box(1), Int.box(100))) - ) - - GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(Option(idx1)) - - hasContainers = Seq( - new HasContainer("gender", P.eq(Int.box(34))), - new HasContainer("age", P.eq(Int.box(34))), - new HasContainer("class", P.eq(Int.box(34))) - ) - - GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(Option(idx2)) - - hasContainers = Seq( - new HasContainer("class", P.eq(Int.box(34))), - new HasContainer("_", P.eq(Int.box(34))) - ) - - GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(Option(idx3)) - - hasContainers = Seq( - new HasContainer("key", P.eq(Int.box(34))), - new HasContainer("value", P.eq(Int.box(34))) - ) - - GlobalIndex.findGlobalIndex(elementType, hasContainers) shouldBe(None) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index 3a73d20..ca78917 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -96,6 +96,7 @@ class S2GraphProvider extends AbstractGraphProvider { val defaultServiceColumn = ServiceColumn.find(defaultService.id.get, S2Graph.DefaultColumnName).getOrElse(throw new IllegalStateException("default column is not initialized.")) val allProps = scala.collection.mutable.Set.empty[Prop] + var knowsProp = Vector( Prop("weight", "0.0", "double"), Prop("data", "-", "string"), @@ -142,7 +143,7 @@ class S2GraphProvider extends AbstractGraphProvider { } ColumnMeta.findByName(defaultServiceColumn.id.get, "aKey", useCache = false).foreach(cm => ColumnMeta.delete(cm.id.get)) - ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, useCache = false) + ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, storeInGlobalIndex = true, useCache = false) } if (loadGraphWith != null && loadGraphWith.value() == GraphData.MODERN) { mnt.createLabel("knows", defaultService.serviceName, "person", "integer", defaultService.serviceName, "person", "integer", @@ -453,9 +454,6 @@ class S2GraphProvider extends AbstractGraphProvider { options = Option("""{"skipReverse": false}""") ) - Seq(GlobalIndex.VertexType, GlobalIndex.EdgeType).foreach { elementType => - mnt.buildGlobalIndex(elementType, "global", allProps.map(_.name).toSeq) - } super.loadGraphData(graph, loadGraphWith, testClass, testName) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1f1dee3d/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala index 18a6c85..45060cc 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala @@ -41,7 +41,6 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { initTests() val g = new S2Graph(config) - lazy val gIndex = management.buildGlobalIndex(GlobalIndex.EdgeType, "S2GraphTest2", Seq("weight")) def printEdges(edges: Seq[Edge]): Unit = { edges.foreach { edge => logger.debug(s"[FetchedEdge]: $edge") @@ -417,7 +416,6 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels { //// } // } ignore("Modern") { - gIndex val mnt = graph.management S2GraphFactory.cleanupDefaultSchema
