Repository: incubator-s2graph Updated Branches: refs/heads/master ddfd10daa -> 7544a0565
add Vertex upsert on schemaVersion v4. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/50c9eaf8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/50c9eaf8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/50c9eaf8 Branch: refs/heads/master Commit: 50c9eaf89d2f5ffffdc51b48e82015baf138a517 Parents: f1e6be2 Author: DO YUNG YOON <[email protected]> Authored: Mon Feb 26 19:00:23 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Feb 26 19:00:23 2018 +0900 ---------------------------------------------------------------------- dev_support/graph_mysql/schema.sql | 1 + .../org/apache/s2graph/core/mysqls/schema.sql | 1 + .../org/apache/s2graph/core/Management.scala | 16 +- .../s2graph/core/S2EdgePropertyHelper.scala | 5 +- .../apache/s2graph/core/S2GraphFactory.scala | 48 +-- .../org/apache/s2graph/core/S2VertexLike.scala | 17 +- .../apache/s2graph/core/S2VertexProperty.scala | 3 + .../s2graph/core/S2VertexPropertyHelper.scala | 42 +++ .../apache/s2graph/core/io/Conversions.scala | 2 + .../apache/s2graph/core/mysqls/ColumnMeta.scala | 19 +- .../apache/s2graph/core/mysqls/Service.scala | 3 +- .../s2graph/core/mysqls/ServiceColumn.scala | 12 + .../core/storage/rocks/RocksHelper.scala | 2 +- .../storage/rocks/RocksStorageReadable.scala | 54 ++- .../vertex/tall/VertexDeserializable.scala | 14 +- .../serde/vertex/tall/VertexSerializable.scala | 20 +- .../core/Integrate/IntegrateCommon.scala | 9 +- .../core/tinkerpop/S2GraphProvider.scala | 18 +- s2graphql/src/main/resources/application.conf | 8 + s2graphql/src/main/scala/GraphQLServer.scala | 2 +- s2graphql/src/main/scala/GraphRepository.scala | 62 +++- s2graphql/src/main/scala/S2Type.scala | 343 +++++++++++++++---- .../rest/play/controllers/AdminController.scala | 2 +- 23 files changed, 557 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/dev_support/graph_mysql/schema.sql ---------------------------------------------------------------------- diff --git a/dev_support/graph_mysql/schema.sql b/dev_support/graph_mysql/schema.sql index 48b27df..5781f3b 100644 --- a/dev_support/graph_mysql/schema.sql +++ b/dev_support/graph_mysql/schema.sql @@ -76,6 +76,7 @@ CREATE TABLE `column_metas` ( `name` varchar(64) NOT NULL, `seq` tinyint NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'string', + `default_value` varchar(64) NOT NULL DEFAULT '', `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql index b66b46b..6b9b71e 100644 --- a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql +++ b/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql @@ -65,6 +65,7 @@ CREATE TABLE `column_metas` ( `name` varchar(64) NOT NULL, `seq` tinyint NOT NULL, `data_type` varchar(8) NOT NULL DEFAULT 'string', + `default_value` varchar(64) NOT NULL DEFAULT '', `store_in_global_index` tinyint NOT NULL DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY `ux_column_id_name` (`column_id`, `name`), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index c7d2d54..545aabe 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -105,6 +105,7 @@ object Management { Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, + defaultValue, storeInGlobalIndex = storeInGlobalIndex, useCache = false) } } @@ -186,13 +187,14 @@ object Management { columnName: String, propsName: String, propsType: String, + defaultValue: String, storeInGlobalIndex: Boolean = false, schemaVersion: String = DEFAULT_VERSION): ColumnMeta = { val result = for { service <- Service.findByName(serviceName, useCache = false) serviceColumn <- ServiceColumn.find(service.id.get, columnName) } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, storeInGlobalIndex) + ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, defaultValue, storeInGlobalIndex) } result.getOrElse({ throw new RuntimeException(s"add property on vertex failed") @@ -350,10 +352,17 @@ class Management(graph: S2GraphLike) { } } +// def createServiceColumn(serviceName: String, +// columnName: String, +// columnType: String, +// props: java.util.List[Prop], +// schemaVersion: String = DEFAULT_VERSION): ServiceColumn = +// createServiceColumn(serviceName, columnName, columnType, props.toSeq, schemaVersion) + def createServiceColumn(serviceName: String, columnName: String, columnType: String, - props: java.util.List[Prop], + props: Seq[Prop], schemaVersion: String = DEFAULT_VERSION): ServiceColumn = { val serviceColumnTry = Model withTx { implicit session => @@ -365,7 +374,7 @@ class Management(graph: S2GraphLike) { for { Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, + ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, defaultValue, storeInGlobalIndex = storeInGlobalIndex, useCache = false) } serviceColumn @@ -374,7 +383,6 @@ class Management(graph: S2GraphLike) { serviceColumnTry.get } - def createLabel(labelName: String, srcColumn: ServiceColumn, tgtColumn: ServiceColumn, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala index 0efb851..1e0a95b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core import java.util.function.BiConsumer import org.apache.s2graph.core.S2Edge.Props -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs} import org.apache.tinkerpop.gremlin.structure.Property @@ -33,6 +33,7 @@ object S2EdgePropertyHelper { edge.getPropsWithTs().put(key, newProp) newProp } + def updatePropsWithTs(edge: S2EdgeLike, others: Props = S2Edge.EmptyProps): Props = { val emptyProp = S2Edge.EmptyProps @@ -80,10 +81,12 @@ object S2EdgePropertyHelper { } } + def toLabelMetas(edge: S2EdgeLike, keys: Seq[String]): Seq[LabelMeta] = { for { key <- keys labelMeta <- edge.innerLabel.metaPropsInvMap.get(key) } yield labelMeta } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala index 07a9be1..64108db 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala @@ -69,30 +69,30 @@ object S2GraphFactory { val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false) val DefaultColumnMetas = { - ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", storeInGlobalIndex = true, useCache = false) - ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", "true", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", "0.0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", "0.0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "x", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "y", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "location", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "status", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "myId", "integer", "0", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "acl", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "some", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "this", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "that", "string", "-", storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(DefaultColumn.id.get, "any", "string", "-", storeInGlobalIndex = true, useCache = false) } // Management.deleteLabel("_s2graph") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index 0d898d1..5612525 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -23,7 +23,7 @@ import java.util.function.{BiConsumer, Consumer} import org.apache.s2graph.core.S2Vertex.Props import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, Service, ServiceColumn} -import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.types.{InnerValLike, VertexId} import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex, VertexProperty} import play.api.libs.json.Json @@ -199,4 +199,19 @@ trait S2VertexLike extends Vertex with GraphElement { throw Vertex.Exceptions.vertexRemovalNotSupported() } } + + def propertyValue(key: String): Option[InnerValLike] = + S2VertexPropertyHelper.propertyValue(this, key) + + def propertyValueInner(columnMeta: ColumnMeta): InnerValLike = + S2VertexPropertyHelper.propertyValueInner(this, columnMeta) + + def propertyValues(keys: Seq[String] = Nil): Map[ColumnMeta, InnerValLike] = { + S2VertexPropertyHelper.propertyValuesInner(this, S2VertexPropertyHelper.toColumnMetas(this, keys)) + } + + def propertyValuesInner(columnMetas: Seq[ColumnMeta] = Nil): Map[ColumnMeta, InnerValLike] = { + S2VertexPropertyHelper.propertyValuesInner(this, columnMetas) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala index 1fbc894..e0abfba 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala @@ -48,6 +48,9 @@ case class S2VertexProperty[V](element: S2VertexLike, innerVal.bytes } + + val valueAny = castValue(v, columnMeta.dataType) + val value = castValue(v, columnMeta.dataType).asInstanceOf[V] override def properties[U](strings: String*): util.Iterator[Property[U]] = ??? http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala new file mode 100644 index 0000000..bed69ef --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala @@ -0,0 +1,42 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.mysqls.ColumnMeta +import org.apache.s2graph.core.types.InnerValLike + +object S2VertexPropertyHelper { + def propertyValue(v: S2VertexLike, key: String): Option[InnerValLike] = { + key match { + case "id" => Option(v.innerId) + case _ => + v.serviceColumn.metasInvMap.get(key).map(x => propertyValueInner(v, x)) + } + } + + + def propertyValuesInner(vertex: S2VertexLike, columnMetas: Seq[ColumnMeta] = Nil): Map[ColumnMeta, InnerValLike] = { + if (columnMetas.isEmpty) { + vertex.serviceColumn.metaPropsDefaultMap.map { case (columnMeta, defaultVal) => + columnMeta -> propertyValueInner(vertex, columnMeta) + } + } else { + (ColumnMeta.reservedMetas ++ columnMetas).map { columnMeta => + columnMeta -> propertyValueInner(vertex, columnMeta) + }.toMap + } + } + + def propertyValueInner(vertex: S2VertexLike, columnMeta: ColumnMeta): InnerValLike = { + if (vertex.props.containsKey(columnMeta.name)) { + vertex.props.get(columnMeta.name).innerVal + } else { + vertex.serviceColumn.metaPropsDefaultMap(columnMeta) + } + } + + def toColumnMetas(vertex: S2VertexLike, keys: Seq[String]): Seq[ColumnMeta] = { + for { + key <- keys + columnMeta <- vertex.serviceColumn.metasInvMap.get(key) + } yield columnMeta + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala index 974965f..83159e2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala @@ -92,6 +92,7 @@ object Conversions { (JsPath \ "name").read[String] and (JsPath \ "seq").read[Byte] and (JsPath \ "dataType").read[String] and + (JsPath \ "defaultValue").read[String] and (JsPath \ "storeGlobalIndex").read[Boolean] )(ColumnMeta.apply _) @@ -101,6 +102,7 @@ object Conversions { (JsPath \ "name").write[String] and (JsPath \ "seq").write[Byte] and (JsPath \ "dataType").write[String] and + (JsPath \ "defaultValue").write[String] and (JsPath \ "storeGlobalIndex").write[Boolean] )(unlift(ColumnMeta.unapply)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala index b764841..f72f59e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -28,16 +28,19 @@ object ColumnMeta extends Model[ColumnMeta] { val timeStampSeq = -1.toByte val lastModifiedAtColumnSeq = 0.toByte - val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long") + val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long", "-1L") val maxValue = Byte.MaxValue - val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq.toByte, "long") + val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq.toByte, "long", "-1L") val reservedMetas = Seq(timestamp, lastModifiedAtColumn) val reservedMetaNamesSet = reservedMetas.map(_.name).toSet + def isValid(columnMeta: ColumnMeta): Boolean = + columnMeta.id.isDefined && columnMeta.id.get > 0 && columnMeta.seq >= 0 + def valueOf(rs: WrappedResultSet): ColumnMeta = { ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), - rs.byte("seq"), rs.string("data_type").toLowerCase(), rs.boolean("store_in_global_index")) + rs.byte("seq"), rs.string("data_type").toLowerCase(), rs.string("default_value"), rs.boolean("store_in_global_index")) } def findById(id: Int)(implicit session: DBSession = AutoSession) = { @@ -72,12 +75,12 @@ object ColumnMeta extends Model[ColumnMeta] { } } - def insert(columnId: Int, name: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { + def insert(columnId: Int, name: String, dataType: String, defaultValue: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = { val ls = findAllByColumn(columnId, false) val seq = ls.size + 1 if (seq <= maxValue) { - sql"""insert into column_metas(column_id, name, seq, data_type, store_in_global_index) - select ${columnId}, ${name}, ${seq}, ${dataType}, ${storeInGlobalIndex}""" + sql"""insert into column_metas(column_id, name, seq, data_type, default_value, store_in_global_index) + select ${columnId}, ${name}, ${seq}, ${dataType}, ${defaultValue}, ${storeInGlobalIndex}""" .updateAndReturnGeneratedKey.apply() } } @@ -85,12 +88,13 @@ object ColumnMeta extends Model[ColumnMeta] { def findOrInsert(columnId: Int, name: String, dataType: String, + defaultValue: String, storeInGlobalIndex: Boolean = false, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = { findByName(columnId, name, useCache) match { case Some(c) => c case None => - insert(columnId, name, dataType, storeInGlobalIndex) + insert(columnId, name, dataType, defaultValue, storeInGlobalIndex) expireCache(s"columnId=$columnId:name=$name") findByName(columnId, name).get } @@ -150,6 +154,7 @@ case class ColumnMeta(id: Option[Int], name: String, seq: Byte, dataType: String, + defaultValue: String, storeInGlobalIndex: Boolean = false) { lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) override def equals(other: Any): Boolean = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala index 286aa37..5b4f494 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -123,7 +123,6 @@ case class Service(id: Option[Int], lazy val extraOptions = Model.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig + def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache) def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) - - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala index be1ae9a..819c378 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.core.mysqls +import org.apache.s2graph.core.JSONParser import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs} import play.api.libs.json.Json @@ -31,6 +32,14 @@ object ServiceColumn extends Model[ServiceColumn] { ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) } + def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = { + val cacheKey = "serviceId=" + serviceId + + lazy val sql = sql"""select * from service_columns where service_id = ${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply() + + if (useCache) withCaches(cacheKey)(sql) + else sql + } def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { val cacheKey = "id=" + id @@ -106,6 +115,9 @@ case class ServiceColumn(id: Option[Int], lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap + lazy val metaPropsDefaultMap = metas.map { meta => + meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, schemaVersion) + }.toMap lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala index 49dda2a..b19fb2b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala @@ -57,7 +57,7 @@ object RocksHelper { } else 0L } - case class ScanWithRange(queryParam: QueryParam, startKey: Array[Byte], stopKey: Array[Byte], offset: Int, limit: Int) + case class ScanWithRange(cf: Array[Byte], startKey: Array[Byte], stopKey: Array[Byte], offset: Int, limit: Int) case class GetRequest(cf: Array[Byte], key: Array[Byte]) type RocksRPC = Either[GetRequest, ScanWithRange] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala index 2d6b542..e575c5b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala @@ -72,7 +72,7 @@ class RocksStorageReadable(val graph: S2GraphLike, (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) } - Right(ScanWithRange(queryParam, startKey, stopKey, queryParam.offset, queryParam.limit)) + Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit)) case Some(tgtId) => // snapshotEdge val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head @@ -81,8 +81,12 @@ class RocksStorageReadable(val graph: S2GraphLike, } private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = { - val kv = serDe.vertexSerializer(vertex).toKeyValues.head - Left(GetRequest(SKeyValue.VertexCf, kv.row)) + val startKey = vertex.id.bytes + val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue)) + + Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue)) +// val kv = serDe.vertexSerializer(vertex).toKeyValues.head +// Left(GetRequest(SKeyValue.VertexCf, kv.row)) } override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { @@ -119,16 +123,18 @@ class RocksStorageReadable(val graph: S2GraphLike, else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis())) Future.successful(kvs) - case Right(ScanWithRange(queryParam, startKey, stopKey, offset, limit)) => + case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) => + val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db val kvs = new ArrayBuffer[SKeyValue]() - val iter = db.newIterator() + val iter = _db.newIterator() + try { var idx = 0 iter.seek(startKey) - val (startOffset, len) = (queryParam.innerOffset, queryParam.innerLimit) + val (startOffset, len) = (offset, limit) while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) { if (idx >= startOffset) { - kvs += SKeyValue(table, iter.key, SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) + kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis()) } iter.next() @@ -137,7 +143,6 @@ class RocksStorageReadable(val graph: S2GraphLike, } finally { iter.close() } - Future.successful(kvs) } } @@ -172,24 +177,45 @@ class RocksStorageReadable(val graph: S2GraphLike, } override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + import scala.collection.mutable + val vertices = new ArrayBuffer[S2VertexLike]() ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) => val distinctColumns = columns.toSet val iter = vdb.newIterator() + val buffer = mutable.ListBuffer.empty[SKeyValue] + var oldVertexIdBytes = Array.empty[Byte] + var minusPos = 0 + try { iter.seekToFirst() while (iter.isValid) { + val row = iter.key() + if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) { + if (buffer.nonEmpty) + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + + oldVertexIdBytes = row + minusPos = 1 + buffer.clear() + } val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis()) - - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) - .filter(v => distinctColumns(v.serviceColumn)) - .foreach { vertex => - vertices += vertex - } + buffer += kv iter.next() } + if (buffer.nonEmpty) + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + } finally { iter.close() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala index 5576017..2a7cd6a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala @@ -23,7 +23,7 @@ import org.apache.s2graph.core.mysqls.ColumnMeta import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.storage.serde.StorageDeserializable._ -import org.apache.s2graph.core.types.{HBaseType, InnerValLike, VertexId} +import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, VertexId} import org.apache.s2graph.core.{S2Graph, S2GraphLike, S2Vertex, S2VertexLike} class VertexDeserializable(graph: S2GraphLike, @@ -32,19 +32,19 @@ class VertexDeserializable(graph: S2GraphLike, def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = { try { - assert(_kvs.size == 1) - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } val kv = kvs.head val version = HBaseType.DEFAULT_VERSION - val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) + val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length - 1, version) val serviceColumn = vertexId.column val schemaVer = serviceColumn.schemaVersion - val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, serviceColumn) - val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike] - props.foreach { case (columnMeta, innerVal) => + kvs.foreach { kv => + val columnMetaSeq = kv.row.last + val columnMeta = serviceColumn.metasMap(columnMetaSeq) + val (innerVal, _) = InnerVal.fromBytes(kv.value, 0, kv.value.length, schemaVer) + propsMap += (columnMeta -> innerVal) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala index 6e82b87..476311c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.core.storage.serde.vertex.tall +import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.{S2Vertex, S2VertexLike} import org.apache.s2graph.core.storage.SKeyValue import org.apache.s2graph.core.storage.serde.Serializable @@ -44,11 +45,18 @@ case class VertexSerializable(vertex: S2VertexLike, intToBytes: Int => Array[Byt /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ override def toKeyValues: Seq[SKeyValue] = { - val row = toRowKey - val qualifier = toQualifier - val value = toValue - Seq( - SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) - ) +// val row = toRowKey +// val qualifier = toQualifier +// val value = toValue +// Seq( +// SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) +// ) + (vertex.props.asScala ++ vertex.defaultProps.asScala).toSeq.map { case (_, v) => + val row = Bytes.add(vertex.id.bytes, Array.fill(1)(v.columnMeta.seq)) + val qualifier = Array.empty[Byte] + val value = v.innerVal.bytes + + SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala index 820f93a..c96231a 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala @@ -92,10 +92,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { } } - val vertexPropsKeys = List("age" -> "integer", "im" -> "string") + val vertexPropsKeys = List( + ("age", "integer", "0"), + ("im", "string", "-") + ) - vertexPropsKeys.map { case (key, keyType) => - Management.addVertexProp(testServiceName, testColumnName, key, keyType, storeInGlobalIndex = true) + vertexPropsKeys.map { case (key, keyType, defaultValue) => + Management.addVertexProp(testServiceName, testColumnName, key, keyType, defaultValue, storeInGlobalIndex = true) } // vertex type global index. http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index ca78917..f52f86a 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -120,30 +120,30 @@ class S2GraphProvider extends AbstractGraphProvider { if (testClass.getSimpleName == "PropertyFeatureSupportTest") { knowsProp = knowsProp.filterNot(_.name == "aKey") - val dataType = if (testName.toLowerCase.contains("boolean")) { + val (dataType, defaultValue) = if (testName.toLowerCase.contains("boolean")) { knowsProp = knowsProp.filterNot(_.name == "aKey") :+ Prop("aKey", "false", "boolean") - "boolean" + ("boolean", "false") } else if (testName.toLowerCase.contains("integer")) { knowsProp = knowsProp.filterNot(_.name == "aKey") :+ Prop("aKey", "0", "integer") - "integer" + ("integer", "0") } else if (testName.toLowerCase.contains("long")) { knowsProp = knowsProp.filterNot(_.name == "aKey") :+ Prop("aKey", "0", "long") - "long" + ("long", "0") } else if (testName.toLowerCase.contains("double")) { knowsProp = knowsProp.filterNot(_.name == "aKey") :+ Prop("aKey", "0.0", "double") - "double" + ("double", "0.0") } else if (testName.toLowerCase.contains("float")) { knowsProp = knowsProp.filterNot(_.name == "aKey") :+ Prop("aKey", "0.0", "float") - "float" + ("float", "0.0") } else if (testName.toLowerCase.contains("string")) { knowsProp = knowsProp.filterNot(_.name == "aKey") :+ Prop("aKey", "-", "string") - "string" + ("string", "-") } else { - "string" + ("string", "-") } ColumnMeta.findByName(defaultServiceColumn.id.get, "aKey", useCache = false).foreach(cm => ColumnMeta.delete(cm.id.get)) - ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, storeInGlobalIndex = true, useCache = false) + ColumnMeta.findOrInsert(defaultServiceColumn.id.get, "aKey", dataType, defaultValue, storeInGlobalIndex = true, useCache = false) } if (loadGraphWith != null && loadGraphWith.value() == GraphData.MODERN) { mnt.createLabel("knows", defaultService.serviceName, "person", "integer", defaultService.serviceName, "person", "integer", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2graphql/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/resources/application.conf b/s2graphql/src/main/resources/application.conf index 45eb1b7..335714e 100644 --- a/s2graphql/src/main/resources/application.conf +++ b/s2graphql/src/main/resources/application.conf @@ -22,4 +22,12 @@ akka { loglevel = "INFO" } +//db.default.password = sa +//db.default.user = sa +//s2graph.storage.backend = rocks +//rocks.storage.file.path = rocks_db +//rocks.storage.mode = production +//rocks.storage.ttl = -1 +//rocks.storage.read.only = false + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2graphql/src/main/scala/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/GraphQLServer.scala b/s2graphql/src/main/scala/GraphQLServer.scala index c2e9ca1..1d173a8 100644 --- a/s2graphql/src/main/scala/GraphQLServer.scala +++ b/s2graphql/src/main/scala/GraphQLServer.scala @@ -85,7 +85,7 @@ object GraphQLServer { val s2Type = new S2Type(s2Repository) val newSchema = new SchemaDef(s2Type).S2GraphSchema - println(SchemaRenderer.renderSchema(newSchema)) +// println(SchemaRenderer.renderSchema(newSchema)) println("-" * 80) newSchema http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2graphql/src/main/scala/GraphRepository.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/GraphRepository.scala b/s2graphql/src/main/scala/GraphRepository.scala index d4c910a..cd7d1df 100644 --- a/s2graphql/src/main/scala/GraphRepository.scala +++ b/s2graphql/src/main/scala/GraphRepository.scala @@ -37,18 +37,27 @@ import scala.util.{Failure, Success, Try} * * @param graph */ -class GraphRepository(graph: S2GraphLike) { +class GraphRepository(val graph: S2GraphLike) { val management = graph.management val parser = new RequestParser(graph) implicit val ec = graph.ec - def partialVertexParamToVertex(column: ServiceColumn, param: PartialVertexParam): S2VertexLike = { + def partialServiceParamToVertex(column: ServiceColumn, param: PartialServiceParam): S2VertexLike = { val vid = JSONParser.jsValueToInnerVal(param.vid, column.columnType, column.schemaVersion).get graph.toVertex(param.service.serviceName, column.columnName, vid) } + def partialVertexParamToS2Vertex(serviceName: String, columnName: String, param: PartialVertexParam): S2VertexLike = { + graph.toVertex( + serviceName = serviceName, + columnName = columnName, + id = param.id, + props = param.props, + ts = param.ts) + } + def partialEdgeParamToS2Edge(labelName: String, param: PartialEdgeParam): S2EdgeLike = { graph.toEdge( srcId = param.from, @@ -59,13 +68,39 @@ class GraphRepository(graph: S2GraphLike) { ) } + def addVertex(args: Args): Future[Option[MutateResponse]] = { + val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => + val innerMap = args.arg[Vector[PartialServiceVertexParam]](serviceName) + val ret = innerMap.map { param => + partialVertexParamToS2Vertex(serviceName, param.columnName, param.vertexParam) + } + + ret + } + + graph.mutateVertices(vertices, withWait = true).map(_.headOption) + } + + def addVertices(args: Args): Future[Seq[MutateResponse]] = { + val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => + val innerMap = args.arg[Map[String, Vector[PartialVertexParam]]](serviceName) + + innerMap.flatMap { case (columnName, params) => + params.map { param => + partialVertexParamToS2Vertex(serviceName, columnName, param) + } + } + } + graph.mutateVertices(vertices, withWait = true) + } + def addEdges(args: Args): Future[Seq[MutateResponse]] = { val edges: Seq[S2EdgeLike] = args.raw.keys.toList.flatMap { labelName => val params = args.arg[Vector[PartialEdgeParam]](labelName) params.map(param => partialEdgeParamToS2Edge(labelName, param)) } - graph.mutateEdges(edges) + graph.mutateEdges(edges, withWait = true) } def addEdge(args: Args): Future[Option[MutateResponse]] = { @@ -74,7 +109,15 @@ class GraphRepository(graph: S2GraphLike) { partialEdgeParamToS2Edge(labelName, param) } - graph.mutateEdges(edges).map(_.headOption) + graph.mutateEdges(edges, withWait = true).map(_.headOption) + } + + def getVertex(vertex: S2VertexLike): Future[Seq[S2VertexLike]] = { + val f = graph.getVertices(Seq(vertex)) + f.foreach{ a => + println(a) + } + f } def getEdges(vertex: S2VertexLike, label: Label, _dir: String): Future[Seq[S2EdgeLike]] = { @@ -110,6 +153,15 @@ class GraphRepository(graph: S2GraphLike) { } } + def createServiceColumn(args: Args): Try[ServiceColumn] = { + val serviceName = args.arg[String]("serviceName") + val columnName = args.arg[String]("columnName") + val columnType = args.arg[String]("columnType") + val props = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty) + + Try { management.createServiceColumn(serviceName, columnName, columnType, props) } + } + def createLabel(args: Args): Try[Label] = { val labelName = args.arg[String]("name") @@ -155,6 +207,8 @@ class GraphRepository(graph: S2GraphLike) { def allServices: List[Service] = Service.findAll() + def allServiceColumns: List[ServiceColumn] = ServiceColumn.findAll() + def findServiceByName(name: String): Option[Service] = Service.findByName(name) def allLabels: List[Label] = Label.findAll() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2graphql/src/main/scala/S2Type.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/S2Type.scala b/s2graphql/src/main/scala/S2Type.scala index 59eae4c..1fae450 100644 --- a/s2graphql/src/main/scala/S2Type.scala +++ b/s2graphql/src/main/scala/S2Type.scala @@ -23,6 +23,7 @@ import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.MutateResponse +import org.apache.s2graph.core.utils.logger import play.api.libs.json.JsValue import sangria.marshalling.{CoercedScalaResultMarshaller, FromInput} import sangria.schema._ @@ -38,7 +39,13 @@ object S2Type { case class MutationResponse[T](result: Try[T]) - case class PartialVertexParam(service: Service, vid: JsValue) + case class PartialServiceParam(service: Service, vid: JsValue) + + case class PartialVertexParam(ts: Long, + id: Any, + props: Map[String, Any]) + + case class PartialServiceVertexParam(columnName: String, vertexParam: PartialVertexParam) case class PartialEdgeParam(ts: Long, from: Any, @@ -46,6 +53,43 @@ object S2Type { direction: String, props: Map[String, Any]) + implicit object PartialServiceVertexParamFromInput extends FromInput[Vector[PartialServiceVertexParam]] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val inputMap = node.asInstanceOf[Map[String, marshaller.Node]] + + val ret = inputMap.toVector.map { case (columnName, node) => + val param = PartialVertexFromInput.fromResult(node) + PartialServiceVertexParam(columnName, param) + } + + ret + } + } + + implicit object PartialVertexFromInput extends FromInput[PartialVertexParam] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + + val _inputMap = node.asInstanceOf[Option[Map[String, Any]]] + val inputMap = _inputMap.get + + val id = inputMap("id") + val ts = inputMap.get("timestamp") match { + case Some(Some(v)) => v.asInstanceOf[Long] + case _ => System.currentTimeMillis() + } + val props = inputMap.get("props") match { + case Some(Some(v)) => v.asInstanceOf[Map[String, Option[Any]]].filter(_._2.isDefined).mapValues(_.get) + case _ => Map.empty[String, Any] + } + + PartialVertexParam(ts, id, props) + } + } + implicit object PartialEdgeFromInput extends FromInput[PartialEdgeParam] { val marshaller = CoercedScalaResultMarshaller.default @@ -124,6 +168,12 @@ class S2Type(repo: GraphRepository) { lazy val ServiceNameArg = Argument("name", OptionInputType(ServiceListType), description = "desc here") + lazy val ServiceNameRawArg = Argument("serviceName", ServiceListType, description = "desc here") + + lazy val ColumnNameArg = Argument("columnName", OptionInputType(ServiceColumnListType), description = "desc here") + + lazy val ColumnTypeArg = Argument("columnType", DataTypeType, description = "desc here") + lazy val LabelNameArg = Argument("name", OptionInputType(LabelListType), description = "desc here") lazy val PropArg = Argument("props", OptionInputType(ListInputType(InputPropType)), description = "desc here") @@ -133,7 +183,20 @@ class S2Type(repo: GraphRepository) { lazy val ServiceType = deriveObjectType[GraphRepository, Service]( ObjectTypeName("Service"), ObjectTypeDescription("desc here"), - RenameField("serviceName", "name") + RenameField("serviceName", "name"), + AddFields( + Field("serviceColumns", ListType(ServiceColumnType), resolve = c => c.value.serviceColumns(false).toList) + ) + ) + + lazy val ServiceColumnType = deriveObjectType[GraphRepository, ServiceColumn]( + ObjectTypeName("ServiceColumn"), + ObjectTypeDescription("desc here"), + RenameField("columnName", "name"), + AddFields( + Field("props", ListType(ColumnMetaType), + resolve = c => c.value.metas.filter(ColumnMeta.isValid)) + ) ) lazy val LabelMetaType = deriveObjectType[GraphRepository, LabelMeta]( @@ -141,6 +204,11 @@ class S2Type(repo: GraphRepository) { ExcludeFields("seq", "labelId") ) + lazy val ColumnMetaType = deriveObjectType[GraphRepository, ColumnMeta]( + ObjectTypeName("ColumnMeta"), + ExcludeFields("seq", "columnId") + ) + lazy val DataTypeType = EnumType( "DataType", description = Option("desc here"), @@ -192,6 +260,15 @@ class S2Type(repo: GraphRepository) { } ) + lazy val ServiceColumnListType = EnumType( + s"ServiceColumnList", + description = Option("desc here"), + values = + dummyEnum +: repo.allServiceColumns.map { serviceColumn => + EnumValue(serviceColumn.columnName, value = serviceColumn.columnName) + } + ) + lazy val LabelListType = EnumType( s"LabelList", description = Option("desc here"), @@ -245,6 +322,29 @@ class S2Type(repo: GraphRepository) { RenameField("label", "name") ) + def makeInputPartialVertexParamType(service: Service, + serviceColumn: ServiceColumn): InputObjectType[PartialVertexParam] = { + lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( + s"${service.serviceName}_${serviceColumn.columnName}_props", + description = "desc here", + () => serviceColumn.metas.filter(ColumnMeta.isValid).map { lm => + InputField(lm.name, OptionInputType(s2TypeToScalarType(lm.dataType))) + } + ) + + lazy val fields = List( + InputField("_", OptionInputType(LongType)) + ) + + InputObjectType[PartialVertexParam]( + s"${service.serviceName}_${serviceColumn.columnName}_mutate", + description = "desc here", + () => + if (!serviceColumn.metas.exists(ColumnMeta.isValid)) fields + else List(InputField("props", OptionInputType(InputPropsType))) + ) + } + def makeInputPartialEdgeParamType(label: Label): InputObjectType[PartialEdgeParam] = { lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( s"${label.label}_props", @@ -270,6 +370,47 @@ class S2Type(repo: GraphRepository) { ) } + lazy val VertexArg = repo.allServices.map { service => + val columnArgs = service.serviceColumns(false).map { serviceColumn => + val dummyField = InputField("_", OptionInputType(LongType)) + + // val inputParialVertexParamType = makeInputPartialVertexParamType(service, serviceColumn) + lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( + s"${service.serviceName}_${serviceColumn.columnName}_props", + description = "desc here", + () => dummyField +: serviceColumn.metas.filter(ColumnMeta.isValid).map { lm => + InputField(lm.name, OptionInputType(s2TypeToScalarType(lm.dataType))) + } + ) + + val tpe = InputObjectType[PartialServiceVertexParam]( + serviceColumn.columnName, + fields = List( + InputField("id", s2TypeToScalarType(serviceColumn.columnType)), + InputField("timestamp", OptionInputType(LongType)), + InputField("props", OptionInputType(InputPropsType)) + ) + ) + + InputField(serviceColumn.columnName, OptionInputType(tpe)) + } + + val vertexParamType = InputObjectType[Vector[PartialServiceVertexParam]]( + s"${service.serviceName}_column", + description = "desc here", + fields = columnArgs.toList + ) + + Argument(service.serviceName, OptionInputType(vertexParamType)) + } + + lazy val verticesArg = repo.allServices.flatMap { service => + service.serviceColumns(false).map { serviceColumn => + val inputParialVertexParamType = makeInputPartialVertexParamType(service, serviceColumn) + Argument(serviceColumn.columnName, OptionInputType(ListInputType(inputParialVertexParamType))) + } + } + lazy val EdgeArg = repo.allLabels.map { label => val inputPartialEdgeParamType = makeInputPartialEdgeParamType(label) Argument(label.label, OptionInputType(inputPartialEdgeParamType)) @@ -288,6 +429,7 @@ class S2Type(repo: GraphRepository) { "hTableTTL" -> IntType ).map { case (name, _type) => Argument(name, OptionInputType(_type)) } + lazy val labelRequiredArg = List( "sourceService" -> InputLabelServiceType, "targetService" -> InputLabelServiceType @@ -307,14 +449,20 @@ class S2Type(repo: GraphRepository) { ServiceType ) + lazy val ServiceColumnMutationResponseType = makeMutationResponseType[ServiceColumn]( + "CreateServiceColumn", + "desc here", + ServiceColumnType + ) + lazy val LabelMutationResponseType = makeMutationResponseType[Label]( "CreateLabel", "desc here", LabelType ) - lazy val EdgeMutateResponseType = deriveObjectType[GraphRepository, MutateResponse]( - ObjectTypeName("EdgeMutateResponse"), + lazy val MutateResponseType = deriveObjectType[GraphRepository, MutateResponse]( + ObjectTypeName("MutateResponse"), ObjectTypeDescription("desc here"), AddFields( Field("isSuccess", BooleanType, resolve = c => c.value.isSuccess) @@ -350,7 +498,8 @@ class S2Type(repo: GraphRepository) { PlayJsonPolyType.PolyType, description = Some("desc here"), resolve = _.value match { - case v: PartialVertexParam => v.vid + case v: PartialServiceParam => v.vid + case vertex: S2VertexLike => JSONParser.innerValToJsValue(vertex.innerId, vertex.serviceColumn.columnType).get case _ => throw new RuntimeException("dead code") } ) @@ -360,24 +509,23 @@ class S2Type(repo: GraphRepository) { LongType, description = Option("desc here"), resolve = _.value match { + case v: S2VertexLike => v.ts case e: S2EdgeLike => e.ts case _ => throw new RuntimeException("dead code") }) - def makeEdgePropFields(edgeFieldNameWithTypes: List[(String, String)]): List[Field[GraphRepository, Any]] = { + def makePropFields(edgeFieldNameWithTypes: List[(String, String)]): List[Field[GraphRepository, Any]] = { def makeField[A](name: String, cType: String, tpe: ScalarType[A]): Field[GraphRepository, Any] = - Field(name, OptionType(tpe), description = Option("desc here"), resolve = _.value match { - case e: S2EdgeLike => - val innerVal = name match { - case "from" => e.srcForVertex.innerId - case "to" => e.tgtForVertex.innerId - case _ => e.propertyValue(name).get.innerVal - } - - JSONParser.innerValToAny(innerVal, cType).asInstanceOf[A] - - case _ => throw new RuntimeException("dead code") - }) + Field(name, + OptionType(tpe), + description = Option("desc here"), + resolve = _.value match { + case v: S2VertexLike => v.propertyValue(name).get + case e: S2EdgeLike => + val innerVal = e.propertyValue(name).get.innerVal + JSONParser.innerValToAny(innerVal, cType).asInstanceOf[A] + case _ => throw new RuntimeException("Error !!!!") + }) edgeFieldNameWithTypes.map { case (cName, cType) => cType match { @@ -393,64 +541,109 @@ class S2Type(repo: GraphRepository) { // ex: KakaoFavorites lazy val serviceVertexFields: List[Field[GraphRepository, Any]] = repo.allServices.map { service => - val serviceId = service.id.get - val connectedLabels = repo.allLabels.filter { lb => - lb.srcServiceId == serviceId || lb.tgtServiceId == serviceId - }.distinct + val columnsOnService = service.serviceColumns(false) // label connected on services, friends, post - lazy val connectedLabelFields: List[Field[GraphRepository, Any]] = connectedLabels.map { label => - val labelColumns = List("from" -> label.srcColumnType, "to" -> label.tgtColumnType) - val labelProps = label.labelMetas.map { lm => lm.name -> lm.dataType } - - lazy val EdgeType = ObjectType(label.label, () => fields[GraphRepository, Any](edgeFields ++ connectedLabelFields: _*)) - lazy val edgeFields: List[Field[GraphRepository, Any]] = tsField :: makeEdgePropFields(labelColumns ++ labelProps) - lazy val edgeTypeField: Field[GraphRepository, Any] = Field( - label.label, - ListType(EdgeType), - arguments = DirArg :: Nil, - description = Some("edges"), - resolve = { c => - val dir = c.argOpt("direction").getOrElse("out") - - val vertex: S2VertexLike = c.value match { - case v: S2VertexLike => v - case e: S2Edge => if (dir == "out") e.tgtVertex else e.srcVertex - case vp: PartialVertexParam => - if (dir == "out") c.ctx.partialVertexParamToVertex(label.tgtColumn, vp) - else c.ctx.partialVertexParamToVertex(label.srcColumn, vp) - } + lazy val ServiceColumnOnServiceType = ObjectType( + s"${service.serviceName}", + fields[GraphRepository, Any](columnsOnService.toList.map { column => + val connectedLabels = repo.allLabels.filter { lb => + column.id.get == lb.srcColumn.id.get || column.id.get == lb.tgtColumn.id.get + }.distinct + + lazy val connectedLabelFields: List[Field[GraphRepository, Any]] = connectedLabels.map { label => + val labelColumns = List("from" -> label.srcColumnType, "to" -> label.tgtColumnType) + val labelProps = label.labelMetas.map { lm => lm.name -> lm.dataType } + + lazy val EdgeType = ObjectType(label.label, () => fields[GraphRepository, Any](edgeFields ++ connectedLabelFields: _*)) + lazy val edgeFields: List[Field[GraphRepository, Any]] = tsField :: makePropFields(labelColumns ++ labelProps) + lazy val edgeTypeField: Field[GraphRepository, Any] = Field( + label.label, + ListType(EdgeType), + arguments = DirArg :: Nil, + description = Some("edges"), + resolve = { c => + val dir = c.argOpt("direction").getOrElse("out") + + val vertex: S2VertexLike = c.value match { + case v: S2VertexLike => v + case e: S2Edge => if (dir == "out") e.tgtVertex else e.srcVertex + case vp: PartialServiceParam => + if (dir == "out") c.ctx.partialServiceParamToVertex(label.tgtColumn, vp) + else c.ctx.partialServiceParamToVertex(label.srcColumn, vp) + } + + c.ctx.getEdges(vertex, label, dir) + } + ) + + edgeTypeField + } + + lazy val EdgeType = ObjectType("_", () => fields[GraphRepository, Any](edgeTypeFieldDummy)) + lazy val edgeTypeFieldDummy: Field[GraphRepository, Any] = Field( + "_", + ListType(EdgeType), + arguments = DirArg :: Nil, + description = Some("dummy edge"), + resolve = { c => Nil } + ) - c.ctx.getEdges(vertex, label, dir) + lazy val columnMetasKv = column.metas.filter(ColumnMeta.isValid).map { columnMeta => + columnMeta.name -> columnMeta.dataType } - ) - edgeTypeField - } + val vertexPropFields = makePropFields(columnMetasKv) + lazy val edgeOnColumnType = ObjectType( + s"${service.serviceName}_on_${column.columnName}", + () => fields[GraphRepository, Any](List(edgeTypeFieldDummy, tsField, vertexIdField) ++ vertexPropFields ++ connectedLabelFields: _*) + ) - lazy val VertexType = ObjectType( - s"${service.serviceName}", - fields[GraphRepository, Any](vertexIdField +: connectedLabelFields: _*) + Field(column.columnName, + ListType(edgeOnColumnType), + arguments = List( + Argument("id", OptionInputType(PlayJsonPolyType.PolyType)), + Argument("ids", OptionInputType(ListInputType(PlayJsonPolyType.PolyType))), + Argument("search", OptionInputType(ListInputType(StringType))) + ), + description = Option("desc here"), + resolve = c => { + val id = c.argOpt[JsValue]("id").toSeq + val ids = c.argOpt[List[JsValue]]("ids").toList.flatten + val svc = c.ctx.findServiceByName(service.serviceName).get + + val vids = (id ++ ids).map { vid => + val vp = PartialServiceParam(svc, vid) + c.ctx.partialServiceParamToVertex(column, vp) + } + + repo.getVertex(vids.head) + } + ): Field[GraphRepository, Any] + }: _*) ) Field( service.serviceName, - ListType(VertexType), - arguments = List( - Argument("id", OptionInputType(PlayJsonPolyType.PolyType)), - Argument("ids", OptionInputType(ListInputType(PlayJsonPolyType.PolyType))) - ), + ServiceColumnOnServiceType, description = Some(s"serviceName: ${service.serviceName}"), resolve = { c => - val id = c.argOpt[JsValue]("id").toSeq - val ids = c.argOpt[List[JsValue]]("ids").toList.flatten - val svc = c.ctx.findServiceByName(service.serviceName).get + // val id = c.argOpt[JsValue]("id").toSeq + // val ids = c.argOpt[List[JsValue]]("ids").toList.flatten + // val svc = c.ctx.findServiceByName(service.serviceName).get + // + // (id ++ ids).map { vid => PartialServiceParam(svc, vid) } - (id ++ ids).map { vid => PartialVertexParam(svc, vid) } + service } ): Field[GraphRepository, Any] } + + /** + * Management query + */ + lazy val serviceField: Field[GraphRepository, Any] = Field( "Services", ListType(ServiceType), @@ -464,6 +657,19 @@ class S2Type(repo: GraphRepository) { } ) + lazy val serviceColumnField: Field[GraphRepository, Any] = Field( + "ServiceColumn", + ListType(ServiceColumnType), + description = Option("desc here"), + arguments = List(ServiceNameRawArg, ColumnNameArg, PropArg), + resolve = { c => + c.argOpt[String]("name") match { + case Some(name) => c.ctx.allServiceColumns.filter(_.columnName == name) + case None => c.ctx.allServiceColumns + } + } + ) + lazy val labelField: Field[GraphRepository, Any] = Field( "Labels", ListType(LabelType), @@ -500,18 +706,33 @@ class S2Type(repo: GraphRepository) { arguments = NameArg :: serviceOptArgs, resolve = c => MutationResponse(c.ctx.createService(c.args)) ), + Field("createServiceColumn", + ServiceColumnMutationResponseType, + arguments = List(ServiceNameRawArg, Argument("columnName", StringType), ColumnTypeArg, PropArg), + resolve = c => MutationResponse(c.ctx.createServiceColumn(c.args)) + ), Field("createLabel", LabelMutationResponseType, arguments = NameArg :: PropArg :: IndicesArg :: labelRequiredArg ::: labelOptsArgs, resolve = c => MutationResponse(c.ctx.createLabel(c.args)) ), + Field("addVertex", + OptionType(MutateResponseType), + arguments = VertexArg, + resolve = c => c.ctx.addVertex(c.args) + ), + Field("addVertices", + ListType(MutateResponseType), + arguments = verticesArg, + resolve = c => c.ctx.addVertices(c.args) + ), Field("addEdge", - OptionType(EdgeMutateResponseType), + OptionType(MutateResponseType), arguments = EdgeArg, resolve = c => c.ctx.addEdge(c.args) ), Field("addEdges", - ListType(EdgeMutateResponseType), + ListType(MutateResponseType), arguments = EdgesArg, resolve = c => c.ctx.addEdges(c.args) ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/50c9eaf8/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala index 2ee6395..23bda0a 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala @@ -317,7 +317,7 @@ object AdminController extends Controller { serviceColumn <- ServiceColumn.find(service.id.get, columnName) prop <- requestParser.toPropElements(js).toOption } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.dataType, storeInGlobalIndex) + ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.dataType, prop.defaultValue, storeInGlobalIndex) } }
