add api: deleteServiceColumn, deleteLabel
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/bab13a32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/bab13a32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/bab13a32 Branch: refs/heads/master Commit: bab13a32d6c7842f460b357a325325ce051ccae0 Parents: aeaff3f Author: daewon <[email protected]> Authored: Tue Feb 27 18:59:44 2018 +0900 Committer: daewon <[email protected]> Committed: Tue Feb 27 18:59:44 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/Management.scala | 13 +- .../apache/s2graph/core/S2VertexProperty.scala | 3 + .../apache/s2graph/core/mysqls/Service.scala | 2 +- s2graphql/src/main/resources/application.conf | 14 +- s2graphql/src/main/scala/GraphQLServer.scala | 110 --- s2graphql/src/main/scala/GraphRepository.scala | 211 ------ s2graphql/src/main/scala/HttpServer.scala | 61 -- s2graphql/src/main/scala/S2Type.scala | 683 ------------------- .../main/scala/SangriaPlayJsonScalarType.scala | 76 --- s2graphql/src/main/scala/SchemaDef.scala | 36 - .../apache/s2graph/graphql/GraphQLServer.scala | 107 +++ .../org/apache/s2graph/graphql/HttpServer.scala | 62 ++ .../s2graph/graphql/marshaller/package.scala | 102 +++ .../graphql/repository/GraphRepository.scala | 230 +++++++ .../graphql/types/S2ManagementType.scala | 274 ++++++++ .../apache/s2graph/graphql/types/S2Type.scala | 328 +++++++++ .../types/SangriaPlayJsonScalarType.scala | 76 +++ .../s2graph/graphql/types/SchemaDef.scala | 51 ++ .../apache/s2graph/graphql/types/package.scala | 156 +++++ 19 files changed, 1404 insertions(+), 1191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 545aabe..6a12f0a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -118,7 +118,7 @@ object Management { val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false) val columnNames = serviceColumns.map { serviceColumn => ServiceColumn.delete(serviceColumn.id.get) - serviceColumn.columnName + serviceColumn } columnNames.getOrElse(throw new RuntimeException("column not found")) @@ -129,12 +129,11 @@ object Management { Label.findByName(labelName, useCache = useCache) } - def deleteLabel(labelName: String) = { + def deleteLabel(labelName: String): Try[Label] = { Model withTx { implicit session => - Label.findByName(labelName, useCache = false).foreach { label => - Label.deleteAll(label) - } - labelName + val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName)) + Label.deleteAll(label) + label } } @@ -298,6 +297,8 @@ object Management { } class Management(graph: S2GraphLike) { + + import Management._ import scala.collection.JavaConversions._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala index 1fbc894..e0abfba 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala @@ -48,6 +48,9 @@ case class S2VertexProperty[V](element: S2VertexLike, innerVal.bytes } + + val valueAny = castValue(v, columnMeta.dataType) + val value = castValue(v, columnMeta.dataType).asInstanceOf[V] override def properties[U](strings: String*): util.Iterator[Property[U]] = ??? http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala index dd1e87c..5b4f494 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -123,6 +123,6 @@ case class Service(id: Option[Int], lazy val extraOptions = Model.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig - lazy val serviceColumns: Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = true) + def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache) def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/resources/application.conf b/s2graphql/src/main/resources/application.conf index 335714e..63ceb53 100644 --- a/s2graphql/src/main/resources/application.conf +++ b/s2graphql/src/main/resources/application.conf @@ -22,12 +22,12 @@ akka { loglevel = "INFO" } -//db.default.password = sa -//db.default.user = sa -//s2graph.storage.backend = rocks -//rocks.storage.file.path = rocks_db -//rocks.storage.mode = production -//rocks.storage.ttl = -1 -//rocks.storage.read.only = false +db.default.password = sa +db.default.user = sa +s2graph.storage.backend = rocks +rocks.storage.file.path = rocks_db +rocks.storage.mode = production +rocks.storage.ttl = -1 +rocks.storage.read.only = false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/GraphQLServer.scala b/s2graphql/src/main/scala/GraphQLServer.scala deleted file mode 100644 index 1d173a8..0000000 --- a/s2graphql/src/main/scala/GraphQLServer.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph - -import java.util.concurrent.Executors - -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import com.typesafe.config.ConfigFactory -import org.apache.s2graph.core.S2Graph -import org.apache.s2graph.core.utils.SafeUpdateCache -import sangria.ast.Document -import sangria.execution._ -import sangria.marshalling.sprayJson._ -import sangria.parser.QueryParser -import sangria.renderer.SchemaRenderer -import sangria.schema.Schema -import spray.json.{JsObject, JsString, JsValue} - -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success, Try} - -object GraphQLServer { - - // Init s2graph - val numOfThread = Runtime.getRuntime.availableProcessors() - val threadPool = Executors.newFixedThreadPool(numOfThread * 2) - - implicit val ec = ExecutionContext.fromExecutor(threadPool) - - val config = ConfigFactory.load() - val s2graph = new S2Graph(config) - val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1) - val s2Repository = new GraphRepository(s2graph) - val schemaCache = new SafeUpdateCache[Schema[GraphRepository, Any]]("schema", maxSize = 1, ttl = schemaCacheTTL) - - def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { - - val spray.json.JsObject(fields) = requestJSON - val spray.json.JsString(query) = fields("query") - - val operation = fields.get("operationName") collect { - case spray.json.JsString(op) => op - } - - val vars = fields.get("variables") match { - case Some(obj: spray.json.JsObject) => obj - case _ => spray.json.JsObject.empty - } - - QueryParser.parse(query) match { - case Success(queryAst) => complete(executeGraphQLQuery(queryAst, operation, vars)) - case Failure(error) => complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage))) - } - } - - /** - * In development mode(schemaCacheTTL = -1), - * a new schema is created for each request. - */ - println(s"schemaCacheTTL: ${schemaCacheTTL}") - - private def createNewSchema(): Schema[GraphRepository, Any] = { - println(s"Schema updated: ${System.currentTimeMillis()}") - - val s2Type = new S2Type(s2Repository) - val newSchema = new SchemaDef(s2Type).S2GraphSchema - -// println(SchemaRenderer.renderSchema(newSchema)) - println("-" * 80) - - newSchema - } - - private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = { - val s2schema = schemaCache.withCache("s2Schema")(createNewSchema()) - - Executor.execute( - s2schema, - query, - s2Repository, - variables = vars, - operationName = op - ) - .map((res: spray.json.JsValue) => OK -> res) - .recover { - case error: QueryAnalysisError => BadRequest -> error.resolveError - case error: ErrorWithResolver => InternalServerError -> error.resolveError - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/GraphRepository.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/GraphRepository.scala b/s2graphql/src/main/scala/GraphRepository.scala deleted file mode 100644 index 1dd7707..0000000 --- a/s2graphql/src/main/scala/GraphRepository.scala +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph - -import org.apache.s2graph.S2Type._ -import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelIndex, Service, ServiceColumn} -import org.apache.s2graph.core.rest.RequestParser -import org.apache.s2graph.core.storage.MutateResponse -import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection} -import play.api.libs.json._ -import sangria.schema.{Action, Args} - -import scala.concurrent._ -import scala.util.{Failure, Success, Try} - - -/** - * - * @param graph - */ -class GraphRepository(graph: S2GraphLike) { - - val management = graph.management - val parser = new RequestParser(graph) - - implicit val ec = graph.ec - - def partialServiceParamToVertex(column: ServiceColumn, param: PartialServiceParam): S2VertexLike = { - val vid = JSONParser.jsValueToInnerVal(param.vid, column.columnType, column.schemaVersion).get - graph.toVertex(param.service.serviceName, column.columnName, vid) - } - - def partialVertexParamToS2Vertex(serviceName: String, columnName: String, param: PartialVertexParam): S2VertexLike = { - graph.toVertex( - serviceName = serviceName, - columnName = columnName, - id = param.id, - props = param.props, - ts = param.ts) - } - - def partialEdgeParamToS2Edge(labelName: String, param: PartialEdgeParam): S2EdgeLike = { - graph.toEdge( - srcId = param.from, - tgtId = param.to, - labelName = labelName, - props = param.props, - direction = param.direction - ) - } - - def addVertex(args: Args): Future[Option[MutateResponse]] = { - val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => - val innerMap = args.arg[Vector[PartialServiceVertexParam]](serviceName) - val ret = innerMap.map { param => - partialVertexParamToS2Vertex(serviceName, param.columnName, param.vertexParam) - } - - ret - } - - graph.mutateVertices(vertices, withWait = true).map(_.headOption) - } - - def addVertices(args: Args): Future[Seq[MutateResponse]] = { - val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => - val innerMap = args.arg[Map[String, Vector[PartialVertexParam]]](serviceName) - - innerMap.flatMap { case (columnName, params) => - params.map { param => - partialVertexParamToS2Vertex(serviceName, columnName, param) - } - } - } - graph.mutateVertices(vertices, withWait = true) - } - - def addEdges(args: Args): Future[Seq[MutateResponse]] = { - val edges: Seq[S2EdgeLike] = args.raw.keys.toList.flatMap { labelName => - val params = args.arg[Vector[PartialEdgeParam]](labelName) - params.map(param => partialEdgeParamToS2Edge(labelName, param)) - } - - graph.mutateEdges(edges, withWait = true) - } - - def addEdge(args: Args): Future[Option[MutateResponse]] = { - val edges: Seq[S2EdgeLike] = args.raw.keys.toList.map { labelName => - val param = args.arg[PartialEdgeParam](labelName) - partialEdgeParamToS2Edge(labelName, param) - } - - graph.mutateEdges(edges, withWait = true).map(_.headOption) - } - - - def getEdges(vertex: S2VertexLike, label: Label, _dir: String): Future[Seq[S2EdgeLike]] = { - val dir = GraphUtil.directions(_dir) - val labelWithDir = LabelWithDirection(label.id.get, dir) - val step = Step(Seq(QueryParam(labelWithDir))) - val q = Query(Seq(vertex), steps = Vector(step)) - - graph.getEdges(q).map(_.edgeWithScores.map(_.edge)) - } - - def createService(args: Args): Try[Service] = { - val serviceName = args.arg[String]("name") - - Service.findByName(serviceName) match { - case Some(_) => Failure(new RuntimeException(s"Service (${serviceName}) already exists")) - case None => - val cluster = args.argOpt[String]("cluster").getOrElse(parser.DefaultCluster) - val hTableName = args.argOpt[String]("hTableName").getOrElse(s"${serviceName}-${parser.DefaultPhase}") - val preSplitSize = args.argOpt[Int]("preSplitSize").getOrElse(1) - val hTableTTL = args.argOpt[Int]("hTableTTL") - val compressionAlgorithm = args.argOpt[String]("compressionAlgorithm").getOrElse(parser.DefaultCompressionAlgorithm) - - val serviceTry = management - .createService(serviceName, - cluster, - hTableName, - preSplitSize, - hTableTTL, - compressionAlgorithm) - - serviceTry - } - } - - def createServiceColumn(args: Args): Try[ServiceColumn] = { - val serviceName = args.arg[String]("serviceName") - val columnName = args.arg[String]("columnName") - val columnType = args.arg[String]("columnType") - val props = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty) - - Try { management.createServiceColumn(serviceName, columnName, columnType, props) } - } - - def createLabel(args: Args): Try[Label] = { - val labelName = args.arg[String]("name") - - val srcServiceProp = args.arg[LabelServiceProp]("sourceService") - val tgtServiceProp = args.arg[LabelServiceProp]("targetService") - - val allProps = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty) - val indices = args.argOpt[Vector[Index]]("indices").getOrElse(Vector.empty) - - val serviceName = args.argOpt[String]("serviceName").getOrElse(tgtServiceProp.name) - val consistencyLevel = args.argOpt[String]("consistencyLevel").getOrElse("weak") - val hTableName = args.argOpt[String]("hTableName") - val hTableTTL = args.argOpt[Int]("hTableTTL") - val schemaVersion = args.argOpt[String]("schemaVersion").getOrElse(HBaseType.DEFAULT_VERSION) - val isAsync = args.argOpt("isAsync").getOrElse(false) - val compressionAlgorithm = args.argOpt[String]("compressionAlgorithm").getOrElse(parser.DefaultCompressionAlgorithm) - val isDirected = args.argOpt[Boolean]("isDirected").getOrElse(true) - val options = args.argOpt[String]("options") // TODO: support option type - - val labelTry: scala.util.Try[Label] = management.createLabel( - labelName, - srcServiceProp.name, - srcServiceProp.columnName, - srcServiceProp.dataType, - tgtServiceProp.name, - tgtServiceProp.columnName, - tgtServiceProp.dataType, - isDirected, - serviceName, - indices, - allProps, - consistencyLevel, - hTableName, - hTableTTL, - schemaVersion, - isAsync, - compressionAlgorithm, - options - ) - - labelTry - } - - def allServices: List[Service] = Service.findAll() - - def allServiceColumns: List[ServiceColumn] = ServiceColumn.findAll() - - def findServiceByName(name: String): Option[Service] = Service.findByName(name) - - def allLabels: List[Label] = Label.findAll() - - def findLabelByName(name: String): Option[Label] = Label.findByName(name) - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/HttpServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/HttpServer.scala b/s2graphql/src/main/scala/HttpServer.scala deleted file mode 100644 index cf1cc0c..0000000 --- a/s2graphql/src/main/scala/HttpServer.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer -import akka.http.scaladsl.server._ -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ - -import Console._ -import scala.concurrent.Await -import scala.language.postfixOps - -object Server extends App { - - implicit val actorSystem = ActorSystem("s2graphql-server") - implicit val materializer = ActorMaterializer() - - import actorSystem.dispatcher - import scala.concurrent.duration._ - - println("Starting GRAPHQL server...") - - val route: Route = - (post & path("graphql")) { - entity(as[spray.json.JsValue])(GraphQLServer.endpoint) - } ~ { - getFromResource("graphiql.html") - } - - val port = sys.props.get("http.port").fold(8000)(_.toInt) - Http().bindAndHandle(route, "0.0.0.0", port) - - - def shutdown(): Unit = { - println("Terminating...") - actorSystem.terminate() - Await.result(actorSystem.whenTerminated, 10 seconds) - - println("Terminated.") - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/S2Type.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/S2Type.scala b/s2graphql/src/main/scala/S2Type.scala deleted file mode 100644 index c584363..0000000 --- a/s2graphql/src/main/scala/S2Type.scala +++ /dev/null @@ -1,683 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph - -import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls._ -import org.apache.s2graph.core.storage.MutateResponse -import org.apache.s2graph.core.utils.logger -import play.api.libs.json.JsValue -import sangria.marshalling.{CoercedScalaResultMarshaller, FromInput} -import sangria.schema._ - -import scala.language.existentials -import scala.util.{Failure, Success, Try} - -object S2Type { - - import sangria.schema._ - - case class LabelServiceProp(name: String, columnName: String, dataType: String) - - case class MutationResponse[T](result: Try[T]) - - case class PartialServiceParam(service: Service, vid: JsValue) - - case class PartialVertexParam(ts: Long, - id: Any, - props: Map[String, Any]) - - case class PartialServiceVertexParam(columnName: String, vertexParam: PartialVertexParam) - - case class PartialEdgeParam(ts: Long, - from: Any, - to: Any, - direction: String, - props: Map[String, Any]) - - implicit object PartialServiceVertexParamFromInput extends FromInput[Vector[PartialServiceVertexParam]] { - val marshaller = CoercedScalaResultMarshaller.default - - def fromResult(node: marshaller.Node) = { - val inputMap = node.asInstanceOf[Map[String, marshaller.Node]] - - val ret = inputMap.toVector.map { case (columnName, node) => - val param = PartialVertexFromInput.fromResult(node) - PartialServiceVertexParam(columnName, param) - } - - ret - } - } - - implicit object PartialVertexFromInput extends FromInput[PartialVertexParam] { - val marshaller = CoercedScalaResultMarshaller.default - - def fromResult(node: marshaller.Node) = { - - val inputMap = node.asInstanceOf[Map[String, Any]] - val id = inputMap("id") - val ts = inputMap.get("timestamp") match { - case Some(Some(v)) => v.asInstanceOf[Long] - case _ => System.currentTimeMillis() - } - val props = inputMap.get("props") match { - case Some(Some(v)) => v.asInstanceOf[Map[String, Option[Any]]].filter(_._2.isDefined).mapValues(_.get) - case _ => Map.empty[String, Any] - } - - PartialVertexParam(ts, id, props) - } - } - - implicit object PartialEdgeFromInput extends FromInput[PartialEdgeParam] { - val marshaller = CoercedScalaResultMarshaller.default - - def fromResult(node: marshaller.Node) = { - val inputMap = node.asInstanceOf[Map[String, Any]] - - val from = inputMap("from") - val to = inputMap("to") - - val ts = inputMap.get("timestamp") match { - case Some(Some(v)) => v.asInstanceOf[Long] - case _ => System.currentTimeMillis() - } - - val dir = inputMap.get("direction") match { - case Some(Some(v)) => v.asInstanceOf[String] - case _ => "out" - } - - val props = inputMap.get("props") match { - case Some(Some(v)) => v.asInstanceOf[Map[String, Option[Any]]].filter(_._2.isDefined).mapValues(_.get) - case _ => Map.empty[String, Any] - } - - PartialEdgeParam(ts, from, to, dir, props) - } - } - - implicit object IndexFromInput extends FromInput[Index] { - val marshaller = CoercedScalaResultMarshaller.default - - def fromResult(node: marshaller.Node) = { - val input = node.asInstanceOf[Map[String, Any]] - Index(input("name").asInstanceOf[String], input("propNames").asInstanceOf[Seq[String]]) - } - } - - implicit object PropFromInput extends FromInput[Prop] { - val marshaller = CoercedScalaResultMarshaller.default - - def fromResult(node: marshaller.Node) = { - val input = node.asInstanceOf[Map[String, String]] - Prop(input("name"), input("defaultValue"), input("dataType")) - } - } - - implicit object LabelServiceFromInput extends FromInput[LabelServiceProp] { - val marshaller = CoercedScalaResultMarshaller.default - - def fromResult(node: marshaller.Node) = { - val input = node.asInstanceOf[Map[String, String]] - LabelServiceProp(input("name"), input("columnName"), input("dataType")) - } - } - - def s2TypeToScalarType(from: String): ScalarType[_] = from match { - case "string" => StringType - case "int" => IntType - case "integer" => IntType - case "long" => LongType - case "float" => FloatType - case "double" => FloatType - case "boolean" => BooleanType - case "bool" => BooleanType - } -} - -class S2Type(repo: GraphRepository) { - - import sangria.macros.derive._ - import S2Type._ - - lazy val DirArg = Argument("direction", OptionInputType(DirectionType), "desc here", defaultValue = "out") - - lazy val NameArg = Argument("name", StringType, description = "desc here") - - lazy val ServiceNameArg = Argument("name", OptionInputType(ServiceListType), description = "desc here") - - lazy val ServiceNameRawArg = Argument("serviceName", ServiceListType, description = "desc here") - - lazy val ColumnNameArg = Argument("columnName", OptionInputType(ServiceColumnListType), description = "desc here") - - lazy val ColumnTypeArg = Argument("columnType", DataTypeType, description = "desc here") - - lazy val LabelNameArg = Argument("name", OptionInputType(LabelListType), description = "desc here") - - lazy val PropArg = Argument("props", OptionInputType(ListInputType(InputPropType)), description = "desc here") - - lazy val IndicesArg = Argument("indices", OptionInputType(ListInputType(InputIndexType)), description = "desc here") - - lazy val ServiceType = deriveObjectType[GraphRepository, Service]( - ObjectTypeName("Service"), - ObjectTypeDescription("desc here"), - RenameField("serviceName", "name"), - AddFields( - Field("serviceColumns", ListType(ServiceColumnType), resolve = c => c.value.serviceColumns.toList) - ) - ) - - lazy val ServiceColumnType = deriveObjectType[GraphRepository, ServiceColumn]( - ObjectTypeName("ServiceColumn"), - ObjectTypeDescription("desc here"), - RenameField("columnName", "name"), - AddFields( - Field("props", ListType(ColumnMetaType), - resolve = c => c.value.metas.filter(ColumnMeta.isValid)) - ) - ) - - lazy val LabelMetaType = deriveObjectType[GraphRepository, LabelMeta]( - ObjectTypeName("LabelMeta"), - ExcludeFields("seq", "labelId") - ) - - lazy val ColumnMetaType = deriveObjectType[GraphRepository, ColumnMeta]( - ObjectTypeName("ColumnMeta"), - ExcludeFields("seq", "columnId") - ) - - lazy val DataTypeType = EnumType( - "DataType", - description = Option("desc here"), - values = List( - EnumValue("string", value = "string"), - EnumValue("int", value = "int"), - EnumValue("long", value = "long"), - EnumValue("float", value = "float"), - EnumValue("boolean", value = "boolean") - ) - ) - - lazy val DirectionType = EnumType( - "Direction", - description = Option("desc here"), - values = List( - EnumValue("out", value = "out"), - EnumValue("in", value = "in") - ) - ) - - lazy val InputIndexType = InputObjectType[Index]( - "Index", - description = "desc here", - fields = List( - InputField("name", StringType), - InputField("propNames", ListInputType(StringType)) - ) - ) - - lazy val InputPropType = InputObjectType[Prop]( - "Prop", - description = "desc here", - fields = List( - InputField("name", StringType), - InputField("dataType", DataTypeType), - InputField("defaultValue", StringType) - ) - ) - - lazy val dummyEnum = EnumValue("_", value = "_") - - lazy val ServiceListType = EnumType( - s"ServiceList", - description = Option("desc here"), - values = - dummyEnum +: repo.allServices.map { service => - EnumValue(service.serviceName, value = service.serviceName) - } - ) - - lazy val ServiceColumnListType = EnumType( - s"ServiceColumnList", - description = Option("desc here"), - values = - dummyEnum +: repo.allServiceColumns.map { serviceColumn => - EnumValue(serviceColumn.columnName, value = serviceColumn.columnName) - } - ) - - lazy val LabelListType = EnumType( - s"LabelList", - description = Option("desc here"), - values = - dummyEnum +: repo.allLabels.map { label => - EnumValue(label.label, value = label.label) - } - ) - - lazy val CompressionAlgorithmType = EnumType( - "CompressionAlgorithm", - description = Option("desc here"), - values = List( - EnumValue("gz", description = Option("desc here"), value = "gz"), - EnumValue("lz4", description = Option("desc here"), value = "lz4") - ) - ) - - lazy val ConsistencyLevelType = EnumType( - "ConsistencyList", - description = Option("desc here"), - values = List( - EnumValue("weak", description = Option("desc here"), value = "weak"), - EnumValue("strong", description = Option("desc here"), value = "strong") - ) - ) - - lazy val InputLabelServiceType = InputObjectType[LabelServiceProp]( - "LabelServiceProp", - description = "desc here", - fields = List( - InputField("name", ServiceListType), - InputField("columnName", StringType), - InputField("dataType", DataTypeType) - ) - ) - - lazy val LabelIndexType = deriveObjectType[GraphRepository, LabelIndex]( - ObjectTypeName("LabelIndex"), - ObjectTypeDescription("desc here"), - ExcludeFields("seq", "metaSeqs", "formulars", "labelId") - ) - - lazy val LabelType = deriveObjectType[GraphRepository, Label]( - ObjectTypeName("Label"), - ObjectTypeDescription("desc here"), - AddFields( - Field("indexes", ListType(LabelIndexType), resolve = c => Nil), - Field("props", ListType(LabelMetaType), resolve = c => Nil) - ), - RenameField("label", "name") - ) - - def makeInputPartialVertexParamType(service: Service, - serviceColumn: ServiceColumn): InputObjectType[PartialVertexParam] = { - lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( - s"${service.serviceName}_${serviceColumn.columnName}_props", - description = "desc here", - () => serviceColumn.metas.filter(ColumnMeta.isValid).map { lm => - InputField(lm.name, OptionInputType(s2TypeToScalarType(lm.dataType))) - } - ) - - lazy val fields = List( - InputField("_", OptionInputType(LongType)) - ) - - InputObjectType[PartialVertexParam]( - s"${service.serviceName}_${serviceColumn.columnName}_mutate", - description = "desc here", - () => - if (serviceColumn.metas.filter(ColumnMeta.isValid).isEmpty) fields - else List(InputField("props", OptionInputType(InputPropsType))) - ) - } - - def makeInputPartialEdgeParamType(label: Label): InputObjectType[PartialEdgeParam] = { - lazy val InputPropsType = InputObjectType[Map[String, ScalarType[_]]]( - s"${label.label}_props", - description = "desc here", - () => label.labelMetaSet.toList.map { lm => - InputField(lm.name, OptionInputType(s2TypeToScalarType(lm.dataType))) - } - ) - - lazy val labelFields = List( - InputField("timestamp", OptionInputType(LongType)), - InputField("from", s2TypeToScalarType(label.srcColumnType)), - InputField("to", s2TypeToScalarType(label.srcColumnType)), - InputField("direction", OptionInputType(DirectionType)) - ) - - InputObjectType[PartialEdgeParam]( - s"${label.label}_mutate", - description = "desc here", - () => - if (label.labelMetaSet.isEmpty) labelFields - else labelFields ++ Seq(InputField("props", OptionInputType(InputPropsType))) - ) - } - - lazy val VertexArg = repo.allServices.map { service => - val columnArgs = service.serviceColumns.map { serviceColumn => - val inputParialVertexParamType = makeInputPartialVertexParamType(service, serviceColumn) - val tpe = InputObjectType[PartialServiceVertexParam]( - serviceColumn.columnName, - fields = List( - InputField("id", s2TypeToScalarType(serviceColumn.columnType)), - InputField("timestamp", OptionInputType(LongType)), - InputField("props", OptionInputType(inputParialVertexParamType)) - ) - ) - - InputField(serviceColumn.columnName, tpe) - } - - val vertexParamType = InputObjectType[Vector[PartialServiceVertexParam]]( - s"${service.serviceName}_column", - description = "desc here", - fields = columnArgs.toList - ) - - Argument(service.serviceName, vertexParamType) - } - - lazy val verticesArg = repo.allServices.flatMap { service => - service.serviceColumns.map { serviceColumn => - val inputParialVertexParamType = makeInputPartialVertexParamType(service, serviceColumn) - Argument(serviceColumn.columnName, OptionInputType(ListInputType(inputParialVertexParamType))) - } - } - - lazy val EdgeArg = repo.allLabels.map { label => - val inputPartialEdgeParamType = makeInputPartialEdgeParamType(label) - Argument(label.label, OptionInputType(inputPartialEdgeParamType)) - } - - lazy val EdgesArg = repo.allLabels.map { label => - val inputPartialEdgeParamType = makeInputPartialEdgeParamType(label) - Argument(label.label, OptionInputType(ListInputType(inputPartialEdgeParamType))) - } - - lazy val serviceOptArgs = List( - "compressionAlgorithm" -> CompressionAlgorithmType, - "cluster" -> StringType, - "hTableName" -> StringType, - "preSplitSize" -> IntType, - "hTableTTL" -> IntType - ).map { case (name, _type) => Argument(name, OptionInputType(_type)) } - - - lazy val labelRequiredArg = List( - "sourceService" -> InputLabelServiceType, - "targetService" -> InputLabelServiceType - ).map { case (name, _type) => Argument(name, _type) } - - lazy val labelOptsArgs = List( - "serviceName" -> ServiceListType, - "consistencyLevel" -> ConsistencyLevelType, - "isDirected" -> BooleanType, - "isAsync" -> BooleanType, - "schemaVersion" -> StringType - ).map { case (name, _type) => Argument(name, OptionInputType(_type)) } - - lazy val ServiceMutationResponseType = makeMutationResponseType[Service]( - "CreateService", - "desc here", - ServiceType - ) - - lazy val ServiceColumnMutationResponseType = makeMutationResponseType[ServiceColumn]( - "CreateServiceColumn", - "desc here", - ServiceColumnType - ) - - lazy val LabelMutationResponseType = makeMutationResponseType[Label]( - "CreateLabel", - "desc here", - LabelType - ) - - lazy val MutateResponseType = deriveObjectType[GraphRepository, MutateResponse]( - ObjectTypeName("MutateResponse"), - ObjectTypeDescription("desc here"), - AddFields( - Field("isSuccess", BooleanType, resolve = c => c.value.isSuccess) - ) - ) - - def makeMutationResponseType[T](name: String, desc: String, tpe: ObjectType[_, T]) = { - ObjectType( - name, - desc, - () => fields[Unit, MutationResponse[T]]( - Field("isSuccess", - BooleanType, - resolve = _.value.result.isSuccess - ), - Field("message", - StringType, - resolve = _.value.result match { - case Success(_) => s"Created successful" - case Failure(ex) => ex.getMessage - } - ), - Field("created", - OptionType(tpe), - resolve = _.value.result.toOption - ) - ) - ) - } - - lazy val vertexIdField: Field[GraphRepository, Any] = Field( - "id", - PlayJsonPolyType.PolyType, - description = Some("desc here"), - resolve = _.value match { - case v: PartialServiceParam => v.vid - case _ => throw new RuntimeException("dead code") - } - ) - - lazy val tsField: Field[GraphRepository, Any] = - Field("timestamp", - LongType, - description = Option("desc here"), - resolve = _.value match { - case e: S2EdgeLike => e.ts - case _ => throw new RuntimeException("dead code") - }) - - def makeEdgePropFields(edgeFieldNameWithTypes: List[(String, String)]): List[Field[GraphRepository, Any]] = { - def makeField[A](name: String, cType: String, tpe: ScalarType[A]): Field[GraphRepository, Any] = - Field(name, OptionType(tpe), description = Option("desc here"), resolve = _.value match { - case e: S2EdgeLike => - val innerVal = name match { - case "from" => e.srcForVertex.innerId - case "to" => e.tgtForVertex.innerId - case _ => e.propertyValue(name).get.innerVal - } - - JSONParser.innerValToAny(innerVal, cType).asInstanceOf[A] - - case _ => throw new RuntimeException("dead code") - }) - - edgeFieldNameWithTypes.map { case (cName, cType) => - cType match { - case "boolean" | "bool" => makeField[Boolean](cName, cType, BooleanType) - case "string" | "str" | "s" => makeField[String](cName, cType, StringType) - case "int" | "integer" | "i" | "int32" | "integer32" => makeField[Int](cName, cType, IntType) - case "long" | "l" | "int64" | "integer64" => makeField[Long](cName, cType, LongType) - case "double" | "d" | "float64" | "float" | "f" | "float32" => makeField[Double](cName, cType, FloatType) - case _ => throw new RuntimeException(s"Cannot support data type: ${cType}") - } - } - } - - // ex: KakaoFavorites - lazy val serviceVertexFields: List[Field[GraphRepository, Any]] = repo.allServices.map { service => - val serviceId = service.id.get - val connectedLabels = repo.allLabels.filter { lb => - lb.srcServiceId == serviceId || lb.tgtServiceId == serviceId - }.distinct - - // label connected on services, friends, post - lazy val connectedLabelFields: List[Field[GraphRepository, Any]] = connectedLabels.map { label => - val labelColumns = List("from" -> label.srcColumnType, "to" -> label.tgtColumnType) - val labelProps = label.labelMetas.map { lm => lm.name -> lm.dataType } - - lazy val EdgeType = ObjectType(label.label, () => fields[GraphRepository, Any](edgeFields ++ connectedLabelFields: _*)) - lazy val edgeFields: List[Field[GraphRepository, Any]] = tsField :: makeEdgePropFields(labelColumns ++ labelProps) - lazy val edgeTypeField: Field[GraphRepository, Any] = Field( - label.label, - ListType(EdgeType), - arguments = DirArg :: Nil, - description = Some("edges"), - resolve = { c => - val dir = c.argOpt("direction").getOrElse("out") - - val vertex: S2VertexLike = c.value match { - case v: S2VertexLike => v - case e: S2Edge => if (dir == "out") e.tgtVertex else e.srcVertex - case vp: PartialServiceParam => - if (dir == "out") c.ctx.partialServiceParamToVertex(label.tgtColumn, vp) - else c.ctx.partialServiceParamToVertex(label.srcColumn, vp) - } - - c.ctx.getEdges(vertex, label, dir) - } - ) - - edgeTypeField - } - - lazy val VertexType = ObjectType( - s"${service.serviceName}", - fields[GraphRepository, Any](vertexIdField +: connectedLabelFields: _*) - ) - - Field( - service.serviceName, - ListType(VertexType), - arguments = List( - Argument("id", OptionInputType(PlayJsonPolyType.PolyType)), - Argument("ids", OptionInputType(ListInputType(PlayJsonPolyType.PolyType))) - ), - description = Some(s"serviceName: ${service.serviceName}"), - resolve = { c => - val id = c.argOpt[JsValue]("id").toSeq - val ids = c.argOpt[List[JsValue]]("ids").toList.flatten - val svc = c.ctx.findServiceByName(service.serviceName).get - - (id ++ ids).map { vid => PartialServiceParam(svc, vid) } - } - ): Field[GraphRepository, Any] - } - - lazy val serviceField: Field[GraphRepository, Any] = Field( - "Services", - ListType(ServiceType), - description = Option("desc here"), - arguments = List(ServiceNameArg), - resolve = { c => - c.argOpt[String]("name") match { - case Some(name) => c.ctx.allServices.filter(_.serviceName == name) - case None => c.ctx.allServices - } - } - ) - - lazy val serviceColumnField: Field[GraphRepository, Any] = Field( - "ServiceColumn", - ListType(ServiceColumnType), - description = Option("desc here"), - arguments = List(ServiceNameRawArg, ColumnNameArg, PropArg), - resolve = { c => - c.argOpt[String]("name") match { - case Some(name) => c.ctx.allServiceColumns.filter(_.columnName == name) - case None => c.ctx.allServiceColumns - } - } - ) - - lazy val labelField: Field[GraphRepository, Any] = Field( - "Labels", - ListType(LabelType), - description = Option("desc here"), - arguments = List(LabelNameArg), - resolve = { c => - c.argOpt[String]("name") match { - case Some(name) => c.ctx.allLabels.filter(_.label == name) - case None => c.ctx.allLabels - } - } - ) - - /** - * Query fields - * Provide s2graph query API - * - * - Fields is created(or changed) for metadata is changed. - */ - lazy val queryFields = Seq(serviceField, labelField) ++ serviceVertexFields - - /** - * Mutation fields - * Provide s2graph management API - * - * - createService - * - createLabel - * - addEdge - * - addEdges - */ - lazy val mutationFields: List[Field[GraphRepository, Any]] = List( - Field("createService", - ServiceMutationResponseType, - arguments = NameArg :: serviceOptArgs, - resolve = c => MutationResponse(c.ctx.createService(c.args)) - ), - Field("createServiceColumn", - ServiceColumnMutationResponseType, - arguments = List(ServiceNameRawArg, Argument("columnName", StringType), ColumnTypeArg, PropArg), - resolve = c => MutationResponse(c.ctx.createServiceColumn(c.args)) - ), - Field("createLabel", - LabelMutationResponseType, - arguments = NameArg :: PropArg :: IndicesArg :: labelRequiredArg ::: labelOptsArgs, - resolve = c => MutationResponse(c.ctx.createLabel(c.args)) - ), - Field("addVertex", - OptionType(MutateResponseType), - arguments = VertexArg, - resolve = c => c.ctx.addVertex(c.args) - ), - Field("addVertices", - ListType(MutateResponseType), - arguments = verticesArg, - resolve = c => c.ctx.addVertices(c.args) - ), - Field("addEdge", - OptionType(MutateResponseType), - arguments = EdgeArg, - resolve = c => c.ctx.addEdge(c.args) - ), - Field("addEdges", - ListType(MutateResponseType), - arguments = EdgesArg, - resolve = c => c.ctx.addEdges(c.args) - ) - ) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/SangriaPlayJsonScalarType.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/SangriaPlayJsonScalarType.scala b/s2graphql/src/main/scala/SangriaPlayJsonScalarType.scala deleted file mode 100644 index 8421b8a..0000000 --- a/s2graphql/src/main/scala/SangriaPlayJsonScalarType.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph - -import sangria.ast._ -import sangria.schema._ -import sangria.validation.ValueCoercionViolation - -// https://gist.github.com/OlegIlyenko/5b96f4b54f656aac226d3c4bc33fd2a6 - -object PlayJsonPolyType { - - import sangria.ast - import sangria.schema._ - import play.api.libs.json._ - - case object JsonCoercionViolation extends ValueCoercionViolation("Not valid JSON") - - def scalarTypeToJsValue(v: sangria.ast.Value): JsValue = v match { - case v: IntValue => JsNumber(v.value) - case v: BigIntValue => JsNumber(BigDecimal(v.value.bigInteger)) - case v: FloatValue => JsNumber(v.value) - case v: BigDecimalValue => JsNumber(v.value) - case v: StringValue => JsString(v.value) - case v: BooleanValue => JsBoolean(v.value) - case v: ListValue => JsNull - case v: VariableValue => JsNull - case v: NullValue => JsNull - case v: ObjectValue => JsNull - } - - implicit val PolyType = ScalarType[JsValue]("Poly", - description = Some("Type Poly = String | Number | Boolean"), - coerceOutput = (value, _) â value match { - case JsString(s) => s - case JsNumber(n) => n - case JsBoolean(b) => b - case JsNull => null - case _ => value - }, - coerceUserInput = { - case v: String => Right(JsString(v)) - case v: Boolean => Right(JsBoolean(v)) - case v: Int => Right(JsNumber(v)) - case v: Long => Right(JsNumber(v)) - case v: Float => Right(JsNumber(v.toDouble)) - case v: Double => Right(JsNumber(v)) - case v: BigInt => Right(JsNumber(BigDecimal(v))) - case v: BigDecimal => Right(JsNumber(v)) - case _ => Left(JsonCoercionViolation) - }, - coerceInput = { - case value: ast.StringValue => Right(JsString(value.value)) - case value: ast.IntValue => Right(JsNumber(value.value)) - case value: ast.FloatValue => Right(JsNumber(value.value)) - case value: ast.BigIntValue => Right(JsNumber(BigDecimal(value.value.bigInteger))) - case _ => Left(JsonCoercionViolation) - }) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/SchemaDef.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/SchemaDef.scala b/s2graphql/src/main/scala/SchemaDef.scala deleted file mode 100644 index 6829040..0000000 --- a/s2graphql/src/main/scala/SchemaDef.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph - -/** - * S2Graph GraphQL schema. - * - * When a Label or Service is created, the GraphQL schema is created dynamically. - */ -class SchemaDef(s2Type: S2Type) { - - import sangria.schema._ - - val S2QueryType = ObjectType[GraphRepository, Any]("Query", fields(s2Type.queryFields: _*)) - - val S2MutationType = ObjectType[GraphRepository, Any]("Mutation", fields(s2Type.mutationFields: _*)) - - val S2GraphSchema = Schema(S2QueryType, Option(S2MutationType)) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala new file mode 100644 index 0000000..3365593 --- /dev/null +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.graphql + +import java.util.concurrent.Executors + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.StatusCodes._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.S2Graph +import org.apache.s2graph.core.utils.SafeUpdateCache +import org.apache.s2graph.graphql.repository.GraphRepository +import org.apache.s2graph.graphql.types._ +import sangria.ast.Document +import sangria.execution._ +import sangria.marshalling.sprayJson._ +import sangria.parser.QueryParser +import sangria.schema.Schema +import spray.json.{JsObject, JsString} + +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success, Try} + +object GraphQLServer { + + // Init s2graph + val numOfThread = Runtime.getRuntime.availableProcessors() + val threadPool = Executors.newFixedThreadPool(numOfThread * 2) + + implicit val ec = ExecutionContext.fromExecutor(threadPool) + + val config = ConfigFactory.load() + val s2graph = new S2Graph(config) + val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1) + val s2Repository = new GraphRepository(s2graph) + val schemaCache = new SafeUpdateCache[Schema[GraphRepository, Any]]("schema", maxSize = 1, ttl = schemaCacheTTL) + + def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + + val spray.json.JsObject(fields) = requestJSON + val spray.json.JsString(query) = fields("query") + + val operation = fields.get("operationName") collect { + case spray.json.JsString(op) => op + } + + val vars = fields.get("variables") match { + case Some(obj: spray.json.JsObject) => obj + case _ => spray.json.JsObject.empty + } + + QueryParser.parse(query) match { + case Success(queryAst) => complete(executeGraphQLQuery(queryAst, operation, vars)) + case Failure(error) => complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage))) + } + } + + /** + * In development mode(schemaCacheTTL = -1), + * a new schema is created for each request. + */ + println(s"schemaCacheTTL: ${schemaCacheTTL}") + + private def createNewSchema(): Schema[GraphRepository, Any] = { + println(s"Schema updated: ${System.currentTimeMillis()}") + val newSchema = new SchemaDef(s2Repository).S2GraphSchema + // println(SchemaRenderer.renderSchema(newSchema)) + println("-" * 80) + newSchema + } + + private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = { + val s2schema = schemaCache.withCache("s2Schema")(createNewSchema()) + + Executor.execute( + s2schema, + query, + s2Repository, + variables = vars, + operationName = op + ) + .map((res: spray.json.JsValue) => OK -> res) + .recover { + case error: QueryAnalysisError => BadRequest -> error.resolveError + case error: ErrorWithResolver => InternalServerError -> error.resolveError + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala new file mode 100644 index 0000000..d080477 --- /dev/null +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.graphql + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ +import akka.stream.ActorMaterializer + +import scala.Console._ +import scala.concurrent.Await +import scala.language.postfixOps + +object Server extends App { + + implicit val actorSystem = ActorSystem("s2graphql-server") + implicit val materializer = ActorMaterializer() + + import actorSystem.dispatcher + + import scala.concurrent.duration._ + + println("Starting GRAPHQL server...") + + val route: Route = + (post & path("graphql")) { + entity(as[spray.json.JsValue])(GraphQLServer.endpoint) + } ~ { + getFromResource("graphiql.html") + } + + val port = sys.props.get("http.port").fold(8000)(_.toInt) + Http().bindAndHandle(route, "0.0.0.0", port) + + + def shutdown(): Unit = { + println("Terminating...") + actorSystem.terminate() + Await.result(actorSystem.whenTerminated, 10 seconds) + + println("Terminated.") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/org/apache/s2graph/graphql/marshaller/package.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/marshaller/package.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/marshaller/package.scala new file mode 100644 index 0000000..28579db --- /dev/null +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/marshaller/package.scala @@ -0,0 +1,102 @@ +package org.apache.s2graph.graphql + +import org.apache.s2graph.core.Management.JsonModel._ +import org.apache.s2graph.graphql.types.S2Type._ +import org.apache.s2graph.graphql.types.S2ManagementType._ +import sangria.marshalling._ + +package object marshaller { + + implicit object IndexFromInput extends FromInput[Index] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val input = node.asInstanceOf[Map[String, Any]] + Index(input("name").asInstanceOf[String], input("propNames").asInstanceOf[Seq[String]]) + } + } + + implicit object PropFromInput extends FromInput[Prop] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val input = node.asInstanceOf[Map[String, String]] + Prop(input("name"), input("defaultValue"), input("dataType")) + } + } + + implicit object LabelServiceFromInput extends FromInput[LabelServiceProp] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val input = node.asInstanceOf[Map[String, String]] + LabelServiceProp(input("name"), input("columnName"), input("dataType")) + } + } + + implicit object PartialServiceVertexParamFromInput extends FromInput[Vector[PartialServiceVertexParam]] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val inputMap = node.asInstanceOf[Map[String, marshaller.Node]] + + val ret = inputMap.toVector.map { case (columnName, node) => + val param = PartialVertexFromInput.fromResult(node) + PartialServiceVertexParam(columnName, param) + } + + ret + } + } + + implicit object PartialVertexFromInput extends FromInput[PartialVertexParam] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + + val _inputMap = node.asInstanceOf[Option[Map[String, Any]]] + val inputMap = _inputMap.get + + val id = inputMap("id") + val ts = inputMap.get("timestamp") match { + case Some(Some(v)) => v.asInstanceOf[Long] + case _ => System.currentTimeMillis() + } + val props = inputMap.get("props") match { + case Some(Some(v)) => v.asInstanceOf[Map[String, Option[Any]]].filter(_._2.isDefined).mapValues(_.get) + case _ => Map.empty[String, Any] + } + + PartialVertexParam(ts, id, props) + } + } + + implicit object PartialEdgeFromInput extends FromInput[PartialEdgeParam] { + val marshaller = CoercedScalaResultMarshaller.default + + def fromResult(node: marshaller.Node) = { + val inputMap = node.asInstanceOf[Map[String, Any]] + + val from = inputMap("from") + val to = inputMap("to") + + val ts = inputMap.get("timestamp") match { + case Some(Some(v)) => v.asInstanceOf[Long] + case _ => System.currentTimeMillis() + } + + val dir = inputMap.get("direction") match { + case Some(Some(v)) => v.asInstanceOf[String] + case _ => "out" + } + + val props = inputMap.get("props") match { + case Some(Some(v)) => v.asInstanceOf[Map[String, Option[Any]]].filter(_._2.isDefined).mapValues(_.get) + case _ => Map.empty[String, Any] + } + + PartialEdgeParam(ts, from, to, dir, props) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala new file mode 100644 index 0000000..c3712d4 --- /dev/null +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.graphql.repository + +import org.apache.s2graph.core.Management.JsonModel._ +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.storage.MutateResponse +import org.apache.s2graph.core.types._ +import org.apache.s2graph.graphql.types.S2ManagementType._ +import org.apache.s2graph.graphql.types.S2Type._ +import sangria.schema.{Action, Args} + +import scala.concurrent._ +import scala.util.{Failure, Try} + + +/** + * + * @param graph + */ +class GraphRepository(val graph: S2GraphLike) { + val management = graph.management + val parser = new RequestParser(graph) + + implicit val ec = graph.ec + + def partialServiceParamToVertex(column: ServiceColumn, param: PartialServiceParam): S2VertexLike = { + val vid = JSONParser.toInnerVal(param.vid, column.columnType, column.schemaVersion) + graph.toVertex(param.service.serviceName, column.columnName, vid) + } + + def partialVertexParamToS2Vertex(serviceName: String, columnName: String, param: PartialVertexParam): S2VertexLike = { + graph.toVertex( + serviceName = serviceName, + columnName = columnName, + id = param.id, + props = param.props, + ts = param.ts) + } + + def partialEdgeParamToS2Edge(labelName: String, param: PartialEdgeParam): S2EdgeLike = { + graph.toEdge( + srcId = param.from, + tgtId = param.to, + labelName = labelName, + props = param.props, + direction = param.direction + ) + } + + def addVertex(args: Args): Future[Option[MutateResponse]] = { + val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => + val innerMap = args.arg[Vector[PartialServiceVertexParam]](serviceName) + val ret = innerMap.map { param => + partialVertexParamToS2Vertex(serviceName, param.columnName, param.vertexParam) + } + + ret + } + + graph.mutateVertices(vertices, withWait = true).map(_.headOption) + } + + def addVertices(args: Args): Future[Seq[MutateResponse]] = { + val vertices: Seq[S2VertexLike] = args.raw.keys.toList.flatMap { serviceName => + val innerMap = args.arg[Map[String, Vector[PartialVertexParam]]](serviceName) + + innerMap.flatMap { case (columnName, params) => + params.map { param => + partialVertexParamToS2Vertex(serviceName, columnName, param) + } + } + } + graph.mutateVertices(vertices, withWait = true) + } + + def addEdges(args: Args): Future[Seq[MutateResponse]] = { + val edges: Seq[S2EdgeLike] = args.raw.keys.toList.flatMap { labelName => + val params = args.arg[Vector[PartialEdgeParam]](labelName) + params.map(param => partialEdgeParamToS2Edge(labelName, param)) + } + + graph.mutateEdges(edges, withWait = true) + } + + def addEdge(args: Args): Future[Option[MutateResponse]] = { + val edges: Seq[S2EdgeLike] = args.raw.keys.toList.map { labelName => + val param = args.arg[PartialEdgeParam](labelName) + partialEdgeParamToS2Edge(labelName, param) + } + + graph.mutateEdges(edges, withWait = true).map(_.headOption) + } + + def getVertex(vertex: S2VertexLike): Future[Seq[S2VertexLike]] = { + val f = graph.getVertices(Seq(vertex)) + f.foreach{ a => + println(a) + } + f + } + + def getEdges(vertex: S2VertexLike, label: Label, _dir: String): Future[Seq[S2EdgeLike]] = { + val dir = GraphUtil.directions(_dir) + val labelWithDir = LabelWithDirection(label.id.get, dir) + val step = Step(Seq(QueryParam(labelWithDir))) + val q = Query(Seq(vertex), steps = Vector(step)) + + graph.getEdges(q).map(_.edgeWithScores.map(_.edge)) + } + + def createService(args: Args): Try[Service] = { + val serviceName = args.arg[String]("name") + + Service.findByName(serviceName) match { + case Some(_) => Failure(new RuntimeException(s"Service (${serviceName}) already exists")) + case None => + val cluster = args.argOpt[String]("cluster").getOrElse(parser.DefaultCluster) + val hTableName = args.argOpt[String]("hTableName").getOrElse(s"${serviceName}-${parser.DefaultPhase}") + val preSplitSize = args.argOpt[Int]("preSplitSize").getOrElse(1) + val hTableTTL = args.argOpt[Int]("hTableTTL") + val compressionAlgorithm = args.argOpt[String]("compressionAlgorithm").getOrElse(parser.DefaultCompressionAlgorithm) + + val serviceTry = management + .createService(serviceName, + cluster, + hTableName, + preSplitSize, + hTableTTL, + compressionAlgorithm) + + serviceTry + } + } + + def createServiceColumn(args: Args): Try[ServiceColumn] = { + val serviceName = args.arg[String]("serviceName") + val columnName = args.arg[String]("columnName") + val columnType = args.arg[String]("columnType") + val props = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty) + + Try { management.createServiceColumn(serviceName, columnName, columnType, props) } + } + + def deleteServiceColumn(args: Args): Try[ServiceColumn] = { + val serviceName = args.arg[String]("serviceName") + val columnName = args.arg[String]("columnName") + + Management.deleteColumn(serviceName, columnName) + } + + def createLabel(args: Args): Try[Label] = { + val labelName = args.arg[String]("name") + + val srcServiceProp = args.arg[LabelServiceProp]("sourceService") + val tgtServiceProp = args.arg[LabelServiceProp]("targetService") + + val allProps = args.argOpt[Vector[Prop]]("props").getOrElse(Vector.empty) + val indices = args.argOpt[Vector[Index]]("indices").getOrElse(Vector.empty) + + val serviceName = args.argOpt[String]("serviceName").getOrElse(tgtServiceProp.name) + val consistencyLevel = args.argOpt[String]("consistencyLevel").getOrElse("weak") + val hTableName = args.argOpt[String]("hTableName") + val hTableTTL = args.argOpt[Int]("hTableTTL") + val schemaVersion = args.argOpt[String]("schemaVersion").getOrElse(HBaseType.DEFAULT_VERSION) + val isAsync = args.argOpt("isAsync").getOrElse(false) + val compressionAlgorithm = args.argOpt[String]("compressionAlgorithm").getOrElse(parser.DefaultCompressionAlgorithm) + val isDirected = args.argOpt[Boolean]("isDirected").getOrElse(true) + val options = args.argOpt[String]("options") // TODO: support option type + + val labelTry: scala.util.Try[Label] = management.createLabel( + labelName, + srcServiceProp.name, + srcServiceProp.columnName, + srcServiceProp.dataType, + tgtServiceProp.name, + tgtServiceProp.columnName, + tgtServiceProp.dataType, + isDirected, + serviceName, + indices, + allProps, + consistencyLevel, + hTableName, + hTableTTL, + schemaVersion, + isAsync, + compressionAlgorithm, + options + ) + + labelTry + } + + def deleteLabel(args: Args): Try[Label] = { + val labelName = args.arg[String]("name") + + Management.deleteLabel(labelName) + } + + def allServices: List[Service] = Service.findAll() + + def allServiceColumns: List[ServiceColumn] = ServiceColumn.findAll() + + def findServiceByName(name: String): Option[Service] = Service.findByName(name) + + def allLabels: List[Label] = Label.findAll() + + def findLabelByName(name: String): Option[Label] = Label.findByName(name) + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bab13a32/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2ManagementType.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2ManagementType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2ManagementType.scala new file mode 100644 index 0000000..94bd466 --- /dev/null +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2ManagementType.scala @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.graphql.types + +import org.apache.s2graph.core.Management.JsonModel._ +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.storage.MutateResponse +import org.apache.s2graph.graphql._ +import org.apache.s2graph.graphql.repository.GraphRepository +import play.api.libs.json.JsValue +import sangria.marshalling._ +import sangria.schema._ + +import scala.language.existentials +import scala.util.{Failure, Success, Try} + +import org.apache.s2graph.graphql.marshaller._ + +object S2ManagementType { + + import sangria.schema._ + + case class LabelServiceProp(name: String, + columnName: String, + dataType: String) + + case class MutationResponse[T](result: Try[T]) + + def makeMutationResponseType[T](name: String, desc: String, tpe: ObjectType[_, T]) = { + ObjectType( + name, + desc, + () => fields[Unit, MutationResponse[T]]( + Field("isSuccess", + BooleanType, + resolve = _.value.result.isSuccess + ), + Field("message", + StringType, + resolve = _.value.result match { + case Success(_) => s"Mutation successful" + case Failure(ex) => ex.getMessage + } + ), + Field("response", + OptionType(tpe), + resolve = _.value.result.toOption + ) + ) + ) + } +} + +class S2ManagementType(repo: GraphRepository) { + + import S2ManagementType._ + + import sangria.macros.derive._ + + val NameArg = Argument("name", StringType, description = "desc here") + + lazy val ServiceNameArg = Argument("name", OptionInputType(ServiceListType), description = "desc here") + + lazy val ServiceNameRawArg = Argument("serviceName", ServiceListType, description = "desc here") + + lazy val ColumnNameArg = Argument("columnName", OptionInputType(ServiceColumnListType), description = "desc here") + + lazy val ColumnTypeArg = Argument("columnType", DataTypeType, description = "desc here") + + lazy val LabelNameArg = Argument("name", OptionInputType(LabelListType), description = "desc here") + + lazy val PropArg = Argument("props", OptionInputType(ListInputType(InputPropType)), description = "desc here") + + lazy val IndicesArg = Argument("indices", OptionInputType(ListInputType(InputIndexType)), description = "desc here") + + lazy val ServiceType = deriveObjectType[GraphRepository, Service]( + ObjectTypeName("Service"), + ObjectTypeDescription("desc here"), + RenameField("serviceName", "name"), + AddFields( + Field("serviceColumns", ListType(ServiceColumnType), resolve = c => c.value.serviceColumns(false).toList) + ) + ) + + lazy val ServiceColumnType = deriveObjectType[GraphRepository, ServiceColumn]( + ObjectTypeName("ServiceColumn"), + ObjectTypeDescription("desc here"), + RenameField("columnName", "name"), + AddFields( + Field("props", ListType(ColumnMetaType), + resolve = c => c.value.metas.filter(ColumnMeta.isValid)) + ) + ) + + val dummyEnum = EnumValue("_", value = "_") + + lazy val ServiceListType = EnumType( + s"ServiceList", + description = Option("desc here"), + values = + dummyEnum +: repo.allServices.map { service => + EnumValue(service.serviceName, value = service.serviceName) + } + ) + + lazy val ServiceColumnListType = EnumType( + s"ServiceColumnList", + description = Option("desc here"), + values = + dummyEnum +: repo.allServiceColumns.map { serviceColumn => + EnumValue(serviceColumn.columnName, value = serviceColumn.columnName) + } + ) + + lazy val LabelListType = EnumType( + s"LabelList", + description = Option("desc here"), + values = + dummyEnum +: repo.allLabels.map { label => + EnumValue(label.label, value = label.label) + } + ) + + lazy val InputLabelServiceType = InputObjectType[LabelServiceProp]( + "LabelServiceProp", + description = "desc here", + fields = List( + InputField("name", ServiceListType), + InputField("columnName", StringType), + InputField("dataType", DataTypeType) + ) + ) + + + lazy val ServiceMutationResponseType = makeMutationResponseType[Service]( + "CreateService", + "desc here", + ServiceType + ) + + lazy val ServiceColumnMutationResponseType = makeMutationResponseType[ServiceColumn]( + "CreateServiceColumn", + "desc here", + ServiceColumnType + ) + + lazy val LabelMutationResponseType = makeMutationResponseType[Label]( + "CreateLabel", + "desc here", + LabelType + ) + + lazy val serviceColumnField: Field[GraphRepository, Any] = Field( + "ServiceColumn", + ListType(ServiceColumnType), + description = Option("desc here"), + arguments = List(ServiceNameRawArg, ColumnNameArg, PropArg), + resolve = { c => + c.argOpt[String]("name") match { + case Some(name) => c.ctx.allServiceColumns.filter(_.columnName == name) + case None => c.ctx.allServiceColumns + } + } + ) + + lazy val labelField: Field[GraphRepository, Any] = Field( + "Labels", + ListType(LabelType), + description = Option("desc here"), + arguments = List(LabelNameArg), + resolve = { c => + c.argOpt[String]("name") match { + case Some(name) => c.ctx.allLabels.filter(_.label == name) + case None => c.ctx.allLabels + } + } + ) + + val serviceOptArgs = List( + "compressionAlgorithm" -> CompressionAlgorithmType, + "cluster" -> StringType, + "hTableName" -> StringType, + "preSplitSize" -> IntType, + "hTableTTL" -> IntType + ).map { case (name, _type) => Argument(name, OptionInputType(_type)) } + + + lazy val labelRequiredArg = List( + "sourceService" -> InputLabelServiceType, + "targetService" -> InputLabelServiceType + ).map { case (name, _type) => Argument(name, _type) } + + val labelOptsArgs = List( + "serviceName" -> ServiceListType, + "consistencyLevel" -> ConsistencyLevelType, + "isDirected" -> BooleanType, + "isAsync" -> BooleanType, + "schemaVersion" -> StringType + ).map { case (name, _type) => Argument(name, OptionInputType(_type)) } + + + /** + * Management query + */ + + lazy val serviceField: Field[GraphRepository, Any] = Field( + "Services", + ListType(ServiceType), + description = Option("desc here"), + arguments = List(ServiceNameArg), + resolve = { c => + c.argOpt[String]("name") match { + case Some(name) => c.ctx.allServices.filter(_.serviceName == name) + case None => c.ctx.allServices + } + } + ) + + lazy val queryFields: List[Field[GraphRepository, Any]] = List(serviceField, labelField) + + /** + * Mutation fields + * Provide s2graph management API + * + * - createService + * - createLabel + * - ... + */ + lazy val mutationFields: List[Field[GraphRepository, Any]] = List( + Field("createService", + ServiceMutationResponseType, + arguments = NameArg :: serviceOptArgs, + resolve = c => MutationResponse(c.ctx.createService(c.args)) + ), + Field("createServiceColumn", + ServiceColumnMutationResponseType, + arguments = List(ServiceNameRawArg, Argument("columnName", StringType), ColumnTypeArg, PropArg), + resolve = c => MutationResponse(c.ctx.createServiceColumn(c.args)) + ), + Field("deleteServiceColumn", + ServiceColumnMutationResponseType, + arguments = List(ServiceNameRawArg, Argument("columnName", ServiceColumnListType)), + resolve = c => MutationResponse(c.ctx.deleteServiceColumn(c.args)) + ), + Field("createLabel", + LabelMutationResponseType, + arguments = NameArg :: PropArg :: IndicesArg :: labelRequiredArg ::: labelOptsArgs, + resolve = c => MutationResponse(c.ctx.createLabel(c.args)) + ), + Field("deleteLabel", + LabelMutationResponseType, + arguments = LabelNameArg :: Nil, + resolve = c => MutationResponse(c.ctx.deleteLabel(c.args)) + ) + ) +}
