Repository: incubator-s2graph Updated Branches: refs/heads/master f1e6be2ff -> 8dcb0f15d
- addVertex. - createServiceColumn. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4aaebbf5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4aaebbf5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4aaebbf5 Branch: refs/heads/master Commit: 4aaebbf51b9d90e876b67cc7aa4cae49a0ffca56 Parents: f1e6be2 Author: DO YUNG YOON <[email protected]> Authored: Mon Feb 26 18:22:31 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Feb 26 18:22:31 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/Management.scala | 10 +- .../apache/s2graph/core/mysqls/ColumnMeta.scala | 3 + .../apache/s2graph/core/mysqls/Service.scala | 3 +- .../s2graph/core/mysqls/ServiceColumn.scala | 8 + s2graphql/src/main/resources/application.conf | 8 + s2graphql/src/main/scala/GraphQLServer.scala | 2 +- s2graphql/src/main/scala/GraphRepository.scala | 53 +++++- s2graphql/src/main/scala/S2Type.scala | 186 +++++++++++++++++-- 8 files changed, 254 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index c7d2d54..f37d964 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -350,10 +350,17 @@ class Management(graph: S2GraphLike) { } } +// def createServiceColumn(serviceName: String, +// columnName: String, +// columnType: String, +// props: java.util.List[Prop], +// schemaVersion: String = DEFAULT_VERSION): ServiceColumn = +// createServiceColumn(serviceName, columnName, columnType, props.toSeq, schemaVersion) + def createServiceColumn(serviceName: String, columnName: String, columnType: String, - props: java.util.List[Prop], + props: Seq[Prop], schemaVersion: String = DEFAULT_VERSION): ServiceColumn = { val serviceColumnTry = Model withTx { implicit session => @@ -374,7 +381,6 @@ class Management(graph: S2GraphLike) { serviceColumnTry.get } - def createLabel(labelName: String, srcColumn: ServiceColumn, tgtColumn: ServiceColumn, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala index b764841..aed237d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -35,6 +35,9 @@ object ColumnMeta extends Model[ColumnMeta] { val reservedMetas = Seq(timestamp, lastModifiedAtColumn) val reservedMetaNamesSet = reservedMetas.map(_.name).toSet + def isValid(columnMeta: ColumnMeta): Boolean = + columnMeta.id.isDefined && columnMeta.id.get > 0 && columnMeta.seq >= 0 + def valueOf(rs: WrappedResultSet): ColumnMeta = { ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase(), rs.boolean("store_in_global_index")) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala index 286aa37..dd1e87c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -123,7 +123,6 @@ case class Service(id: Option[Int], lazy val extraOptions = Model.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig + lazy val serviceColumns: Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = true) def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) - - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala index be1ae9a..0f56fd8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -31,6 +31,14 @@ object ServiceColumn extends Model[ServiceColumn] { ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) } + def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = { + val cacheKey = "serviceId=" + serviceId + + lazy val sql = sql"""select * from service_columns where service_id = ${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply() + + if (useCache) withCaches(cacheKey)(sql) + else sql + } def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = { val cacheKey = "id=" + id http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2graphql/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/resources/application.conf b/s2graphql/src/main/resources/application.conf index 45eb1b7..335714e 100644 --- a/s2graphql/src/main/resources/application.conf +++ b/s2graphql/src/main/resources/application.conf @@ -22,4 +22,12 @@ akka { loglevel = "INFO" } +//db.default.password = sa +//db.default.user = sa +//s2graph.storage.backend = rocks +//rocks.storage.file.path = rocks_db +//rocks.storage.mode = production +//rocks.storage.ttl = -1 +//rocks.storage.read.only = false + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2graphql/src/main/scala/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/GraphQLServer.scala b/s2graphql/src/main/scala/GraphQLServer.scala index c2e9ca1..1d173a8 100644 --- a/s2graphql/src/main/scala/GraphQLServer.scala +++ b/s2graphql/src/main/scala/GraphQLServer.scala @@ -85,7 +85,7 @@ object GraphQLServer { val s2Type = new S2Type(s2Repository) val newSchema = new SchemaDef(s2Type).S2GraphSchema - println(SchemaRenderer.renderSchema(newSchema)) +// println(SchemaRenderer.renderSchema(newSchema)) println("-" * 80) newSchema http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2graphql/src/main/scala/GraphRepository.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/GraphRepository.scala b/s2graphql/src/main/scala/GraphRepository.scala index d4c910a..1dd7707 100644 --- a/s2graphql/src/main/scala/GraphRepository.scala +++ b/s2graphql/src/main/scala/GraphRepository.scala @@ -44,11 +44,20 @@ class GraphRepository(graph: S2GraphLike) { implicit val ec = graph.ec - def partialVertexParamToVertex(column: ServiceColumn, param: PartialVertexParam): S2VertexLike = { + def partialServiceParamToVertex(column: ServiceColumn, param: PartialServiceParam): S2VertexLike = { val vid = JSONParser.jsValueToInnerVal(param.vid, column.columnType, column.schemaVersion).get graph.toVertex(param.service.serviceName, column.columnName, vid) } + def partialVertexParamToS2Vertex(serviceName: String, columnName: String, param: PartialVertexParam): S2VertexLike = { + graph.toVertex( + serviceName = serviceName, + columnName = columnName, + id = param.id, + props = param.props, + ts = param.ts) + } + def partialEdgeParamToS2Edge(labelName: String, param: PartialEdgeParam): S2EdgeLike = { graph.toEdge( srcId = param.from, @@ -59,13 +68,39 @@ class GraphRepository(graph: S2GraphLike) { ) } + def addVertex(args: Args): Future[Option[MutateResponse]] = { + val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => + val innerMap = args.arg[Vector[PartialServiceVertexParam]](serviceName) + val ret = innerMap.map { param => + partialVertexParamToS2Vertex(serviceName, param.columnName, param.vertexParam) + } + + ret + } + + graph.mutateVertices(vertices, withWait = true).map(_.headOption) + } + + def addVertices(args: Args): Future[Seq[MutateResponse]] = { + val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => + val innerMap = args.arg[Map[String, Vector[PartialVertexParam]]](serviceName) + + innerMap.flatMap { case (columnName, params) => + params.map { param => + partialVertexParamToS2Vertex(serviceName, columnName, param) + } + } + } + graph.mutateVertices(vertices, withWait = true) + } + def addEdges(args: Args): Future[Seq[MutateResponse]] = { val edges: Seq[S2EdgeLike] = args.raw.keys.toList.flatMap { labelName => val params = args.arg[Vector[PartialEdgeParam]](labelName) params.map(param => partialEdgeParamToS2Edge(labelName, param)) } - graph.mutateEdges(edges) + graph.mutateEdges(edges, withWait = true) } def addEdge(args: Args): Future[Option[MutateResponse]] = { @@ -74,9 +109,10 @@ class GraphRepository(graph: S2GraphLike) { partialEdgeParamToS2Edge(labelName, param) } - graph.mutateEdges(edges).map(_.headOption) + graph.mutateEdges(edges, withWait = true).map(_.headOption) } + def getEdges(vertex: S2VertexLike, label: Label, _dir: String): Future[Seq[S2EdgeLike]] = { val dir = GraphUtil.directions(_dir) val labelWithDir = LabelWithDirection(label.id.get, dir) @@ -110,6 +146,15 @@ class GraphRepository(graph: S2GraphLike) { } } + def createServiceColumn(args: Args): Try[ServiceColumn] = { + val serviceName = args.arg[String]("serviceName") + val columnName = args.arg[String]("columnName") + val columnType = args.arg[String]("columnType") + val props = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty) + + Try { management.createServiceColumn(serviceName, columnName, columnType, props) } + } + def createLabel(args: Args): Try[Label] = { val labelName = args.arg[String]("name") @@ -155,6 +200,8 @@ class GraphRepository(graph: S2GraphLike) { def allServices: List[Service] = Service.findAll() + def allServiceColumns: List[ServiceColumn] = ServiceColumn.findAll() + def findServiceByName(name: String): Option[Service] = Service.findByName(name) def allLabels: List[Label] = Label.findAll() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4aaebbf5/s2graphql/src/main/scala/S2Type.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/S2Type.scala b/s2graphql/src/main/scala/S2Type.scala index 59eae4c..c584363 100644 --- a/s2graphql/src/main/scala/S2Type.scala +++ b/s2graphql/src/main/scala/S2Type.scala @@ -23,6 +23,7 @@ import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.MutateResponse +import org.apache.s2graph.core.utils.logger import play.api.libs.json.JsValue import sangria.marshalling.{CoercedScalaResultMarshaller, FromInput} import sangria.schema._ @@ -38,7 +39,13 @@ object S2Type { case class MutationResponse[T](result: Try[T]) - case class PartialVertexParam(service: Service, vid: JsValue) + case class PartialServiceParam(service: Service, vid: JsValue) + + case class PartialVertexParam(ts: Long, + id: Any, + props: Map[String, Any]) + + case class PartialServiceVertexParam(columnName: String, vertexParam: PartialVertexParam) case class PartialEdgeParam(ts: Long, from: Any, @@ -46,6 +53,41 @@ object S2Type { direction: String, props: Map[String, Any]) + implicit object PartialServiceVertexParamFromInput extends FromInput[Vector[PartialServiceVertexParam]] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val inputMap = node.asInstanceOf[Map[String, marshaller.Node]] + + val ret = inputMap.toVector.map { case (columnName, node) => + val param = PartialVertexFromInput.fromResult(node) + PartialServiceVertexParam(columnName, param) + } + + ret + } + } + + implicit object PartialVertexFromInput extends FromInput[PartialVertexParam] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + + val inputMap = node.asInstanceOf[Map[String, Any]] + val id = inputMap("id") + val ts = inputMap.get("timestamp") match { + case Some(Some(v)) => v.asInstanceOf[Long] + case _ => System.currentTimeMillis() + } + val props = inputMap.get("props") match { + case Some(Some(v)) => v.asInstanceOf[Map[String, Option[Any]]].filter(_._2.isDefined).mapValues(_.get) + case _ => Map.empty[String, Any] + } + + PartialVertexParam(ts, id, props) + } + } + implicit object PartialEdgeFromInput extends FromInput[PartialEdgeParam] { val marshaller = CoercedScalaResultMarshaller.default @@ -124,6 +166,12 @@ class S2Type(repo: GraphRepository) { lazy val ServiceNameArg = Argument("name", OptionInputType(ServiceListType), description = "desc here") + lazy val ServiceNameRawArg = Argument("serviceName", ServiceListType, description = "desc here") + + lazy val ColumnNameArg = Argument("columnName", OptionInputType(ServiceColumnListType), description = "desc here") + + lazy val ColumnTypeArg = Argument("columnType", DataTypeType, description = "desc here") + lazy val LabelNameArg = Argument("name", OptionInputType(LabelListType), description = "desc here") lazy val PropArg = Argument("props", OptionInputType(ListInputType(InputPropType)), description = "desc here") @@ -133,7 +181,20 @@ class S2Type(repo: GraphRepository) { lazy val ServiceType = deriveObjectType[GraphRepository, Service]( ObjectTypeName("Service"), ObjectTypeDescription("desc here"), - RenameField("serviceName", "name") + RenameField("serviceName", "name"), + AddFields( + Field("serviceColumns", ListType(ServiceColumnType), resolve = c => c.value.serviceColumns.toList) + ) + ) + + lazy val ServiceColumnType = deriveObjectType[GraphRepository, ServiceColumn]( + ObjectTypeName("ServiceColumn"), + ObjectTypeDescription("desc here"), + RenameField("columnName", "name"), + AddFields( + Field("props", ListType(ColumnMetaType), + resolve = c => c.value.metas.filter(ColumnMeta.isValid)) + ) ) lazy val LabelMetaType = deriveObjectType[GraphRepository, LabelMeta]( @@ -141,6 +202,11 @@ class S2Type(repo: GraphRepository) { ExcludeFields("seq", "labelId") ) + lazy val ColumnMetaType = deriveObjectType[GraphRepository, ColumnMeta]( + ObjectTypeName("ColumnMeta"), + ExcludeFields("seq", "columnId") + ) + lazy val DataTypeType = EnumType( "DataType", description = Option("desc here"), @@ -192,6 +258,15 @@ class S2Type(repo: GraphRepository) { } ) + lazy val ServiceColumnListType = EnumType( + s"ServiceColumnList", + description = Option("desc here"), + values = + dummyEnum +: repo.allServiceColumns.map { serviceColumn => + EnumValue(serviceColumn.columnName, value = serviceColumn.columnName) + } + ) + lazy val LabelListType = EnumType( s"LabelList", description = Option("desc here"), @@ -245,6 +320,29 @@ class S2Type(repo: GraphRepository) { RenameField("label", "name") ) + def makeInputPartialVertexParamType(service: Service, + serviceColumn: ServiceColumn): InputObjectType[PartialVertexParam] = { + lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( + s"${service.serviceName}_${serviceColumn.columnName}_props", + description = "desc here", + () => serviceColumn.metas.filter(ColumnMeta.isValid).map { lm => + InputField(lm.name, OptionInputType(s2TypeToScalarType(lm.dataType))) + } + ) + + lazy val fields = List( + InputField("_", OptionInputType(LongType)) + ) + + InputObjectType[PartialVertexParam]( + s"${service.serviceName}_${serviceColumn.columnName}_mutate", + description = "desc here", + () => + if (serviceColumn.metas.filter(ColumnMeta.isValid).isEmpty) fields + else List(InputField("props", OptionInputType(InputPropsType))) + ) + } + def makeInputPartialEdgeParamType(label: Label): InputObjectType[PartialEdgeParam] = { lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( s"${label.label}_props", @@ -270,6 +368,37 @@ class S2Type(repo: GraphRepository) { ) } + lazy val VertexArg = repo.allServices.map { service => + val columnArgs = service.serviceColumns.map { serviceColumn => + val inputParialVertexParamType = makeInputPartialVertexParamType(service, serviceColumn) + val tpe = InputObjectType[PartialServiceVertexParam]( + serviceColumn.columnName, + fields = List( + InputField("id", s2TypeToScalarType(serviceColumn.columnType)), + InputField("timestamp", OptionInputType(LongType)), + InputField("props", OptionInputType(inputParialVertexParamType)) + ) + ) + + InputField(serviceColumn.columnName, tpe) + } + + val vertexParamType = InputObjectType[Vector[PartialServiceVertexParam]]( + s"${service.serviceName}_column", + description = "desc here", + fields = columnArgs.toList + ) + + Argument(service.serviceName, vertexParamType) + } + + lazy val verticesArg = repo.allServices.flatMap { service => + service.serviceColumns.map { serviceColumn => + val inputParialVertexParamType = makeInputPartialVertexParamType(service, serviceColumn) + Argument(serviceColumn.columnName, OptionInputType(ListInputType(inputParialVertexParamType))) + } + } + lazy val EdgeArg = repo.allLabels.map { label => val inputPartialEdgeParamType = makeInputPartialEdgeParamType(label) Argument(label.label, OptionInputType(inputPartialEdgeParamType)) @@ -288,6 +417,7 @@ class S2Type(repo: GraphRepository) { "hTableTTL" -> IntType ).map { case (name, _type) => Argument(name, OptionInputType(_type)) } + lazy val labelRequiredArg = List( "sourceService" -> InputLabelServiceType, "targetService" -> InputLabelServiceType @@ -307,14 +437,20 @@ class S2Type(repo: GraphRepository) { ServiceType ) + lazy val ServiceColumnMutationResponseType = makeMutationResponseType[ServiceColumn]( + "CreateServiceColumn", + "desc here", + ServiceColumnType + ) + lazy val LabelMutationResponseType = makeMutationResponseType[Label]( "CreateLabel", "desc here", LabelType ) - lazy val EdgeMutateResponseType = deriveObjectType[GraphRepository, MutateResponse]( - ObjectTypeName("EdgeMutateResponse"), + lazy val MutateResponseType = deriveObjectType[GraphRepository, MutateResponse]( + ObjectTypeName("MutateResponse"), ObjectTypeDescription("desc here"), AddFields( Field("isSuccess", BooleanType, resolve = c => c.value.isSuccess) @@ -350,7 +486,7 @@ class S2Type(repo: GraphRepository) { PlayJsonPolyType.PolyType, description = Some("desc here"), resolve = _.value match { - case v: PartialVertexParam => v.vid + case v: PartialServiceParam => v.vid case _ => throw new RuntimeException("dead code") } ) @@ -416,9 +552,9 @@ class S2Type(repo: GraphRepository) { val vertex: S2VertexLike = c.value match { case v: S2VertexLike => v case e: S2Edge => if (dir == "out") e.tgtVertex else e.srcVertex - case vp: PartialVertexParam => - if (dir == "out") c.ctx.partialVertexParamToVertex(label.tgtColumn, vp) - else c.ctx.partialVertexParamToVertex(label.srcColumn, vp) + case vp: PartialServiceParam => + if (dir == "out") c.ctx.partialServiceParamToVertex(label.tgtColumn, vp) + else c.ctx.partialServiceParamToVertex(label.srcColumn, vp) } c.ctx.getEdges(vertex, label, dir) @@ -446,7 +582,7 @@ class S2Type(repo: GraphRepository) { val ids = c.argOpt[List[JsValue]]("ids").toList.flatten val svc = c.ctx.findServiceByName(service.serviceName).get - (id ++ ids).map { vid => PartialVertexParam(svc, vid) } + (id ++ ids).map { vid => PartialServiceParam(svc, vid) } } ): Field[GraphRepository, Any] } @@ -464,6 +600,19 @@ class S2Type(repo: GraphRepository) { } ) + lazy val serviceColumnField: Field[GraphRepository, Any] = Field( + "ServiceColumn", + ListType(ServiceColumnType), + description = Option("desc here"), + arguments = List(ServiceNameRawArg, ColumnNameArg, PropArg), + resolve = { c => + c.argOpt[String]("name") match { + case Some(name) => c.ctx.allServiceColumns.filter(_.columnName == name) + case None => c.ctx.allServiceColumns + } + } + ) + lazy val labelField: Field[GraphRepository, Any] = Field( "Labels", ListType(LabelType), @@ -500,18 +649,33 @@ class S2Type(repo: GraphRepository) { arguments = NameArg :: serviceOptArgs, resolve = c => MutationResponse(c.ctx.createService(c.args)) ), + Field("createServiceColumn", + ServiceColumnMutationResponseType, + arguments = List(ServiceNameRawArg, Argument("columnName", StringType), ColumnTypeArg, PropArg), + resolve = c => MutationResponse(c.ctx.createServiceColumn(c.args)) + ), Field("createLabel", LabelMutationResponseType, arguments = NameArg :: PropArg :: IndicesArg :: labelRequiredArg ::: labelOptsArgs, resolve = c => MutationResponse(c.ctx.createLabel(c.args)) ), + Field("addVertex", + OptionType(MutateResponseType), + arguments = VertexArg, + resolve = c => c.ctx.addVertex(c.args) + ), + Field("addVertices", + ListType(MutateResponseType), + arguments = verticesArg, + resolve = c => c.ctx.addVertices(c.args) + ), Field("addEdge", - OptionType(EdgeMutateResponseType), + OptionType(MutateResponseType), arguments = EdgeArg, resolve = c => c.ctx.addEdge(c.args) ), Field("addEdges", - ListType(EdgeMutateResponseType), + ListType(MutateResponseType), arguments = EdgesArg, resolve = c => c.ctx.addEdges(c.args) )
