Repository: incubator-s2graph Updated Branches: refs/heads/master 35a369f55 -> 1881df05c
add content-type: application/graphql Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/23a22775 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/23a22775 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/23a22775 Branch: refs/heads/master Commit: 23a22775173b1e5d71124fdddcda7c96d8fb4927 Parents: 32eb344 Author: daewon <[email protected]> Authored: Thu Jun 14 18:08:42 2018 +0900 Committer: daewon <[email protected]> Committed: Thu Jun 14 18:08:42 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/graphql/GraphQLServer.scala | 73 ++++++------ .../org/apache/s2graph/graphql/HttpServer.scala | 112 ++++++++++++++++--- 2 files changed, 132 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/23a22775/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala index eee7c93..f71e16f 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -35,12 +35,13 @@ import sangria.ast.Document import sangria.execution._ import sangria.execution.deferred.DeferredResolver import sangria.marshalling.sprayJson._ -import sangria.parser.QueryParser +import sangria.parser.{QueryParser, SyntaxError} import sangria.schema.Schema -import spray.json.{JsBoolean, JsObject, JsString} +import spray.json._ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} object GraphQLServer { @@ -63,7 +64,7 @@ object GraphQLServer { val schemaCache = new SafeUpdateCache(schemaConfig) - def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Try[Unit] = { val ret = Try { val spray.json.JsObject(fields) = requestJSON val spray.json.JsString(labelName) = fields("label") @@ -72,33 +73,7 @@ object GraphQLServer { s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint) } - ret match { - case Success(f) => complete(OK -> JsString("start")) - case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString))) - } - } - - def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { - val spray.json.JsObject(fields) = requestJSON - val spray.json.JsString(query) = fields("query") - - val operation = fields.get("operationName") collect { - case spray.json.JsString(op) => op - } - - val vars = fields.get("variables") match { - case Some(obj: spray.json.JsObject) => obj - case _ => spray.json.JsObject.empty - } - - QueryParser.parse(query) match { - case Success(queryAst) => - logger.info(queryAst.renderCompact) - complete(executeGraphQLQuery(queryAst, operation, vars)) - case Failure(error) => - logger.warn(error.getMessage, error) - complete(BadRequest -> spray.json.JsObject("error" -> JsString(error.getMessage))) - } + ret } /** @@ -108,17 +83,37 @@ object GraphQLServer { logger.info(s"schemaCacheTTL: ${schemaCacheTTL}") private def createNewSchema(): Schema[GraphRepository, Any] = { - logger.info(s"Schema updated: ${System.currentTimeMillis()}") val newSchema = new SchemaDef(s2Repository).S2GraphSchema - logger.info("-" * 80) + logger.info(s"Schema updated: ${System.currentTimeMillis()}") newSchema } - private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = { - val cacheKey = className + "s2Schema" - val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema()) + def formatError(error: Throwable): JsValue = error match { + case syntaxError: SyntaxError â + JsObject("errors" â JsArray( + JsObject( + "message" â JsString(syntaxError.getMessage), + "locations" â JsArray(JsObject( + "line" â JsNumber(syntaxError.originalError.position.line), + "column" â JsNumber(syntaxError.originalError.position.column)))))) + + case NonFatal(e) â formatError(e.toString) + case e â throw e + } + + def formatError(message: String): JsObject = + JsObject("errors" â JsArray(JsObject("message" â JsString(message)))) + + def onEvictSchema(o: AnyRef): Unit = { + logger.info("Schema Evicted") + } + + def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = { import GraphRepository._ + + val cacheKey = className + "s2Schema" + val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema()) val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher) Executor.execute( @@ -128,15 +123,15 @@ object GraphQLServer { variables = vars, operationName = op, deferredResolver = resolver - ) - .map((res: spray.json.JsValue) => OK -> res) + ).map((res: spray.json.JsValue) => OK -> res) .recover { case error: QueryAnalysisError => - logger.warn(error.getMessage, error) + logger.error("Error on execute", error) BadRequest -> error.resolveError case error: ErrorWithResolver => - logger.error(error.getMessage, error) + logger.error("Error on execute", error) InternalServerError -> error.resolveError } } } + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/23a22775/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala index 6f57cc4..1620f30 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,18 +19,29 @@ package org.apache.s2graph.graphql +import java.nio.charset.Charset + import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ +import akka.http.scaladsl.server.{Route, StandardRoute} import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Flow import org.slf4j.LoggerFactory +import sangria.parser.QueryParser +import spray.json._ import scala.concurrent.Await import scala.language.postfixOps +import scala.util._ +import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller, ToResponseMarshallable} +import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} +import akka.util.ByteString +import sangria.ast.Document +import sangria.renderer.{QueryRenderer, QueryRendererConfig} + +import scala.collection.immutable.Seq object Server extends App { val logger = LoggerFactory.getLogger(this.getClass) @@ -39,20 +50,57 @@ object Server extends App { implicit val materializer = ActorMaterializer() import system.dispatcher - import scala.concurrent.duration._ - val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) { - entity(as[spray.json.JsValue])(GraphQLServer.endpoint) - } ~ (post & path("updateEdgeFetcher")) { - entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher) - } ~ { - getFromResource("assets/graphiql.html") - } + import spray.json.DefaultJsonProtocol._ + + val route: Route = + get { + getFromResource("assets/graphiql.html") + } ~ (post & path("updateEdgeFetcher")) { + entity(as[JsValue]) { body => + GraphQLServer.updateEdgeFetcher(body) match { + case Success(_) => complete(StatusCodes.OK -> JsString("Update fetcher finished")) + case Failure(e) => + logger.error("Error on execute", e) + complete(StatusCodes.InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString))) + } + } + } ~ (post & path("graphql")) { + parameters('operationName.?, 'variables.?) { (operationNameParam, variablesParam) => + entity(as[Document]) { document â + variablesParam.map(parseJson) match { + case None â complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject())) + case Some(Right(js)) â complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject)) + case Some(Left(e)) â + logger.error("Error on execute", e) + complete(StatusCodes.BadRequest -> GraphQLServer.formatError(e)) + } + } ~ entity(as[JsValue]) { body â + val fields = body.asJsObject.fields + + val query = fields.get("query").map(js => js.convertTo[String]) + val operationName = fields.get("operationName").filter(_ == null).map(_.convertTo[String]) + val variables = fields.get("variables").filter(_ == null) + + query.map(QueryParser.parse(_)) match { + case None â complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute")) + case Some(Failure(error)) â + logger.error("Error on execute", error) + complete(StatusCodes.BadRequest -> GraphQLServer.formatError(error)) + case Some(Success(document)) => variables match { + case Some(js) â complete(GraphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject)) + case None â complete(GraphQLServer.executeGraphQLQuery(document, operationName, JsObject())) + } + } + } + } + } val port = sys.props.get("http.port").fold(8000)(_.toInt) - logger.info(s"Starting GraphQL server... ${port}") + logger.info(s"Starting GraphQL server... $port") + Http().bindAndHandle(route, "0.0.0.0", port) def shutdown(): Unit = { @@ -63,4 +111,40 @@ object Server extends App { logger.info("Terminated.") } + + // Unmarshaller + + def unmarshallerContentTypes: Seq[ContentTypeRange] = mediaTypes.map(ContentTypeRange.apply) + + def mediaTypes: Seq[MediaType.WithFixedCharset] = + Seq(MediaType.applicationWithFixedCharset("graphql", HttpCharsets.`UTF-8`, "graphql")) + + implicit def documentMarshaller(implicit config: QueryRendererConfig = QueryRenderer.Compact): ToEntityMarshaller[Document] = { + Marshaller.oneOf(mediaTypes: _*) { + mediaType â + Marshaller.withFixedContentType(ContentType(mediaType)) { + json â HttpEntity(mediaType, QueryRenderer.render(json, config)) + } + } + } + + implicit val documentUnmarshaller: FromEntityUnmarshaller[Document] = { + Unmarshaller.byteStringUnmarshaller + .forContentTypes(unmarshallerContentTypes: _*) + .map { + case ByteString.empty â throw Unmarshaller.NoContentException + case data â + import sangria.parser.DeliveryScheme.Throw + QueryParser.parse(data.decodeString(Charset.forName("UTF-8"))) + } + } + + def parseJson(jsStr: String): Either[Throwable, JsValue] = { + val parsed = Try(jsStr.parseJson) + parsed match { + case Success(js) => Right(js) + case Failure(e) => Left(e) + } + } + }
