[ https://issues.apache.org/jira/browse/PIO-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648770#comment-16648770 ]
ASF GitHub Bot commented on PIO-31: ----------------------------------- takezoe closed pull request #474: [PIO-31] Move from spray to akka-http URL: https://github.com/apache/predictionio/pull/474 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/LICENSE.txt b/LICENSE.txt index 5b1d72d17..c83c2319a 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -409,7 +409,6 @@ Binary distribution bundles com.sun.jersey # jersey-server # 1.9 (https://github.com/jersey/jersey-1.x) javax.xml.bind # jaxb-api # 2.2.2 com.sun.xml.bind # jaxb-impl # 2.2.3-1 - org.jvnet.mimepull # mimepull # 1.9.5 (https://github.com/kohsuke/mimepull) which are available under the CDDL v1.1 license (https://glassfish.java.net/public/CDDL+GPL_1_1.html) @@ -1708,9 +1707,8 @@ Binary distribution bundles org.scala-lang # scala-reflect # 2.11.12 (http://scala-lang.org/) org.scala-lang # scalap # 2.11.12 (http://scala-lang.org/) org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 (http://scala-lang.org/) - org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.4 (http://scala-lang.org/) org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 (http://scala-lang.org/) - org.scala-lang.modules # scala-xml_2.11 # 1.0.3 (http://scala-lang.org/) + org.scala-lang.modules # scala-parser-combinators_2.11 # 1.1.0 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.5 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.6 (http://scala-lang.org/) @@ -1782,3 +1780,4 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. The following libraries are from the public domain. org.tukaani # xz # 1.0 (http://tukaani.org/xz/java.html) + org.reactivestreams # reactive-streams # 1.0.2 (http://www.reactive-streams.org/) diff --git a/common/build.sbt b/common/build.sbt index 19e4f0434..f9fd97bfe 100644 --- a/common/build.sbt +++ b/common/build.sbt @@ -20,9 +20,11 @@ import PIOBuild._ name := "apache-predictionio-common" libraryDependencies ++= Seq( - "io.spray" %% "spray-can" % "1.3.3", - "io.spray" %% "spray-routing" % "1.3.3", - "com.typesafe.akka" %% "akka-actor" % akkaVersion.value, - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion.value) + "com.typesafe.akka" %% "akka-actor" % akkaVersion.value, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion.value, + "com.typesafe.akka" %% "akka-http" % "10.1.5", + "org.json4s" %% "json4s-native" % json4sVersion.value, + "com.typesafe.akka" %% "akka-stream" % "2.5.12" +) pomExtra := childrenPomExtra.value diff --git a/common/src/main/resources/application.conf b/common/src/main/resources/application.conf index c47d909d4..f0e6c8ade 100644 --- a/common/src/main/resources/application.conf +++ b/common/src/main/resources/application.conf @@ -3,10 +3,3 @@ akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "INFO" } - -spray.can { - server { - verbose-error-messages = "on" - request-timeout = 35s - } -} diff --git a/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala b/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala new file mode 100644 index 000000000..62cb8de6b --- /dev/null +++ b/common/src/main/scala/org/apache/predictionio/akkahttpjson4s/Json4sSupport.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.akkahttpjson4s + +// Referenced from https://github.com/hseeberger/akka-http-json +// because of the difference of supported json4s version. +import java.lang.reflect.InvocationTargetException + +import akka.http.scaladsl.marshalling.{ Marshaller, ToEntityMarshaller } +import akka.http.scaladsl.model.ContentTypeRange +import akka.http.scaladsl.model.MediaType +import akka.http.scaladsl.model.MediaTypes.`application/json` +import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller } +import akka.util.ByteString +import org.json4s.{ Formats, MappingException, Serialization } +import scala.collection.immutable.Seq + +/** + * Automatic to and from JSON marshalling/unmarshalling using an in-scope *Json4s* protocol. + * + * Pretty printing is enabled if an implicit [[Json4sSupport.ShouldWritePretty.True]] is in scope. + */ +object Json4sSupport extends Json4sSupport { + + sealed abstract class ShouldWritePretty + + final object ShouldWritePretty { + final object True extends ShouldWritePretty + final object False extends ShouldWritePretty + } +} + +/** + * Automatic to and from JSON marshalling/unmarshalling using an in-scope *Json4s* protocol. + * + * Pretty printing is enabled if an implicit [[Json4sSupport.ShouldWritePretty.True]] is in scope. + */ +trait Json4sSupport { + import Json4sSupport._ + + def unmarshallerContentTypes: Seq[ContentTypeRange] = + mediaTypes.map(ContentTypeRange.apply) + + def mediaTypes: Seq[MediaType.WithFixedCharset] = + List(`application/json`) + + private val jsonStringUnmarshaller = + Unmarshaller.byteStringUnmarshaller + .forContentTypes(unmarshallerContentTypes: _*) + .mapWithCharset { + case (ByteString.empty, _) => throw Unmarshaller.NoContentException + case (data, charset) => data.decodeString(charset.nioCharset.name) + } + + private val jsonStringMarshaller = + Marshaller.oneOf(mediaTypes: _*)(Marshaller.stringMarshaller) + + /** + * HTTP entity => `A` + * + * @tparam A type to decode + * @return unmarshaller for `A` + */ + implicit def unmarshaller[A: Manifest](implicit serialization: Serialization, + formats: Formats): FromEntityUnmarshaller[A] = + jsonStringUnmarshaller + .map(s => serialization.read(s)) + .recover { _ => _ => + { case MappingException(_, ite: InvocationTargetException) => throw ite.getCause } + } + + /** + * `A` => HTTP entity + * + * @tparam A type to encode, must be upper bounded by `AnyRef` + * @return marshaller for any `A` value + */ + implicit def marshaller[A <: AnyRef](implicit serialization: Serialization, + formats: Formats, + shouldWritePretty: ShouldWritePretty = + ShouldWritePretty.False): ToEntityMarshaller[A] = + shouldWritePretty match { + case ShouldWritePretty.False => + jsonStringMarshaller.compose(serialization.write[A]) + case ShouldWritePretty.True => + jsonStringMarshaller.compose(serialization.writePretty[A]) + } +} diff --git a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala index fa950aa0f..08ae09a9d 100644 --- a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala +++ b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala @@ -23,11 +23,10 @@ package org.apache.predictionio.authentication * It is highly recommended to implement a stonger authentication mechanism */ +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.headers.HttpChallenge +import akka.http.scaladsl.server.{AuthenticationFailedRejection, Rejection, RequestContext} import com.typesafe.config.ConfigFactory -import spray.http.HttpRequest -import spray.routing.authentication._ -import spray.routing.{AuthenticationFailedRejection, RequestContext} - import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -42,19 +41,18 @@ trait KeyAuthentication { val param = "accessKey" } - def withAccessKeyFromFile: RequestContext => Future[Authentication[HttpRequest]] = { + def withAccessKeyFromFile: RequestContext => Future[Either[Rejection, HttpRequest]] = { ctx: RequestContext => - val accessKeyParamOpt = ctx.request.uri.query.get(ServerKey.param) + val accessKeyParamOpt = ctx.request.uri.query().get(ServerKey.param) Future { - val passedKey = accessKeyParamOpt.getOrElse { Left(AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, Nil)) + AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("", None))) } if (!ServerKey.authEnforced || passedKey.equals(ServerKey.get)) Right(ctx.request) else Left(AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, Nil)) + AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("", None))) } } diff --git a/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala b/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala index 9292e21e8..7880b13e6 100644 --- a/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala +++ b/common/src/main/scala/org/apache/predictionio/configuration/SSLConfiguration.scala @@ -15,19 +15,13 @@ * limitations under the License. */ - package org.apache.predictionio.configuration -/** - * Created by ykhodorkovsky on 2/26/16. - */ - import java.io.FileInputStream import java.security.KeyStore -import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} import com.typesafe.config.ConfigFactory -import spray.io.ServerSSLEngineProvider +import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} trait SSLConfiguration { @@ -39,7 +33,6 @@ trait SSLConfiguration { private val keyAlias = serverConfig.getString("org.apache.predictionio.server.ssl-key-alias") private val keyStore = { - // Loading keystore from specified file val clientStore = KeyStore.getInstance("JKS") val inputStream = new FileInputStream( @@ -50,7 +43,7 @@ trait SSLConfiguration { } // Creating SSL context - implicit def sslContext: SSLContext = { + def sslContext: SSLContext = { val context = SSLContext.getInstance("TLS") val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) @@ -60,15 +53,4 @@ trait SSLConfiguration { context } - // provide implicit SSLEngine with some protocols - implicit def sslEngineProvider: ServerSSLEngineProvider = { - ServerSSLEngineProvider { engine => - engine.setEnabledCipherSuites(Array( - "TLS_RSA_WITH_AES_256_CBC_SHA", - "TLS_ECDH_ECDSA_WITH_RC4_128_SHA", - "TLS_RSA_WITH_AES_128_CBC_SHA")) - engine.setEnabledProtocols(Array("TLSv1", "TLSv1.2", "TLSv1.1")) - engine - } - } } diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala index 2447682fc..5642114f8 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala @@ -21,11 +21,7 @@ package org.apache.predictionio.workflow import java.io.Serializable import java.util.concurrent.TimeUnit -import akka.actor._ import akka.event.Logging -import akka.io.IO -import akka.pattern.ask -import akka.util.Timeout import com.github.nscala_time.time.Imports.DateTime import com.twitter.bijection.Injection import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator} @@ -34,7 +30,6 @@ import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer import grizzled.slf4j.Logging import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.predictionio.authentication.KeyAuthentication -import org.apache.predictionio.configuration.SSLConfiguration import org.apache.predictionio.controller.{Engine, Params, Utils, WithPrId} import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer} import org.apache.predictionio.data.storage.{EngineInstance, Storage} @@ -42,15 +37,23 @@ import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption import org.json4s._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write -import spray.can.Http -import spray.can.server.ServerSettings -import spray.http.MediaTypes._ -import spray.http._ -import spray.httpx.Json4sSupport -import spray.routing._ +import akka.actor._ +import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext} +import akka.http.scaladsl.Http.ServerBinding +import akka.http.scaladsl.model.ContentTypes._ +import akka.http.scaladsl.model.{HttpEntity, HttpResponse, StatusCodes} +import akka.http.scaladsl.server.Directives.complete +import akka.http.scaladsl.server.directives._ +import akka.http.scaladsl.server._ +import akka.pattern.ask +import akka.util.Timeout +import akka.http.scaladsl.server.Directives._ +import akka.stream.ActorMaterializer +import org.apache.predictionio.akkahttpjson4s.Json4sSupport._ +import org.apache.predictionio.configuration.SSLConfiguration import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.language.existentials import scala.util.{Failure, Random, Success} @@ -177,18 +180,21 @@ object CreateServer extends Logging { "master") implicit val timeout = Timeout(5.seconds) master ? StartServer() - actorSystem.awaitTermination + + val f = actorSystem.whenTerminated + Await.ready(f, Duration.Inf) + } getOrElse { error(s"Invalid engine instance ID. Aborting server.") } } } - def createServerActorWithEngine[TD, EIN, PD, Q, P, A]( + def createPredictionServerWithEngine[TD, EIN, PD, Q, P, A]( sc: ServerConfig, engineInstance: EngineInstance, engine: Engine[TD, EIN, PD, Q, P, A], - engineLanguage: EngineLanguage.Value): ActorRef = { + engineLanguage: EngineLanguage.Value): PredictionServer[Q, P] = { val engineParams = engine.engineInstanceToEngineParams( engineInstance, sc.jsonExtractor) @@ -228,36 +234,50 @@ object CreateServer extends Logging { val serving = Doer(engine.servingClassMap(servingParamsWithName._1), servingParamsWithName._2) - actorSystem.actorOf( - Props( - classOf[ServerActor[Q, P]], - sc, - engineInstance, - engine, - engineLanguage, - engineParams.dataSourceParams._2, - engineParams.preparatorParams._2, - algorithms, - engineParams.algorithmParamsList.map(_._2), - models, - serving, - engineParams.servingParams._2)) + new PredictionServer( + sc, + engineInstance, + engine, + engineLanguage, + engineParams.dataSourceParams._2, + engineParams.preparatorParams._2, + algorithms, + engineParams.algorithmParamsList.map(_._2), + models, + serving, + engineParams.servingParams._2, + actorSystem) } } + +object EngineServerJson4sSupport { + implicit val serialization = org.json4s.jackson.Serialization + implicit def json4sFormats: Formats = DefaultFormats +} + class MasterActor ( sc: ServerConfig, engineInstance: EngineInstance, - engineFactoryName: String) extends Actor with SSLConfiguration with KeyAuthentication { + engineFactoryName: String) extends Actor with KeyAuthentication with SSLConfiguration { + val log = Logging(context.system, this) + implicit val system = context.system - var sprayHttpListener: Option[ActorRef] = None - var currentServerActor: Option[ActorRef] = None + implicit val materializer = ActorMaterializer() + + var currentServerBinding: Option[Future[ServerBinding]] = None var retry = 3 val serverConfig = ConfigFactory.load("server.conf") val sslEnforced = serverConfig.getBoolean("org.apache.predictionio.server.ssl-enforced") val protocol = if (sslEnforced) "https://" else "http://" + val https: Option[HttpsConnectionContext] = if(sslEnforced){ + val https = ConnectionContext.https(sslContext) + Http().setDefaultServerHttpContext(https) + Some(https) + } else None + def undeploy(ip: String, port: Int): Unit = { val serverUrl = s"${protocol}${ip}:${port}" log.info( @@ -287,81 +307,74 @@ class MasterActor ( def receive: Actor.Receive = { case x: StartServer => - val actor = createServerActor( - sc, - engineInstance, - engineFactoryName) - currentServerActor = Some(actor) undeploy(sc.ip, sc.port) self ! BindServer() case x: BindServer => - currentServerActor map { actor => - val settings = ServerSettings(system) - IO(Http) ! Http.Bind( - actor, - interface = sc.ip, - port = sc.port, - settings = Some(settings.copy(sslEncryption = sslEnforced))) - } getOrElse { - log.error("Cannot bind a non-existing server backend.") + currentServerBinding match { + case Some(_) => + log.error("Cannot bind a non-existing server backend.") + case None => + val server = createServer(sc, engineInstance, engineFactoryName) + val route = server.createRoute() + val binding = https match { + case Some(https) => + Http().bindAndHandle(route, sc.ip, sc.port, connectionContext = https) + case None => + Http().bindAndHandle(route, sc.ip, sc.port) + } + currentServerBinding = Some(binding) + + val serverUrl = s"${protocol}${sc.ip}:${sc.port}" + log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.") } case x: StopServer => log.info(s"Stop server command received.") - sprayHttpListener.map { l => - log.info("Server is shutting down.") - l ! Http.Unbind(5.seconds) - system.shutdown() - } getOrElse { - log.warning("No active server is running.") + currentServerBinding match { + case Some(f) => + f.flatMap { binding => + binding.unbind() + }.foreach { _ => + system.terminate() + } + case None => + log.warning("No active server is running.") } case x: ReloadServer => log.info("Reload server command received.") - val latestEngineInstance = - CreateServer.engineInstances.getLatestCompleted( - engineInstance.engineId, - engineInstance.engineVersion, - engineInstance.engineVariant) - latestEngineInstance map { lr => - val actor = createServerActor(sc, lr, engineFactoryName) - sprayHttpListener.map { l => - l ! Http.Unbind(5.seconds) - val settings = ServerSettings(system) - IO(Http) ! Http.Bind( - actor, - interface = sc.ip, - port = sc.port, - settings = Some(settings.copy(sslEncryption = sslEnforced))) - currentServerActor.get ! Kill - currentServerActor = Some(actor) - } getOrElse { - log.warning("No active server is running. Abort reloading.") - } - } getOrElse { - log.warning( - s"No latest completed engine instance for ${engineInstance.engineId} " + - s"${engineInstance.engineVersion}. Abort reloading.") - } - case x: Http.Bound => - val serverUrl = s"${protocol}${sc.ip}:${sc.port}" - log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.") - sprayHttpListener = Some(sender) - case x: Http.CommandFailed => - if (retry > 0) { - retry -= 1 - log.error(s"Bind failed. Retrying... ($retry more trial(s))") - context.system.scheduler.scheduleOnce(1.seconds) { - self ! BindServer() + currentServerBinding match { + case Some(f) => + f.flatMap { binding => + binding.unbind() + } + val latestEngineInstance = + CreateServer.engineInstances.getLatestCompleted( + engineInstance.engineId, + engineInstance.engineVersion, + engineInstance.engineVariant) + latestEngineInstance map { lr => + val server = createServer(sc, lr, engineFactoryName) + val route = server.createRoute() + val binding = https match { + case Some(https) => + Http().bindAndHandle(route, sc.ip, sc.port, connectionContext = https) + case None => + Http().bindAndHandle(route, sc.ip, sc.port) + } + currentServerBinding = Some(binding) + } getOrElse { + log.warning( + s"No latest completed engine instance for ${engineInstance.engineId} " + + s"${engineInstance.engineVersion}. Abort reloading.") + } + case None => + log.warning("No active server is running. Abort reloading.") } - } else { - log.error("Bind failed. Shutting down.") - system.shutdown() - } } - def createServerActor( + def createServer( sc: ServerConfig, engineInstance: EngineInstance, - engineFactoryName: String): ActorRef = { + engineFactoryName: String): PredictionServer[_, _] = { val (engineLanguage, engineFactory) = WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader) val engine = engineFactory() @@ -373,7 +386,7 @@ class MasterActor ( val deployableEngine = engine.asInstanceOf[Engine[_,_,_,_,_,_]] - CreateServer.createServerActorWithEngine( + CreateServer.createPredictionServerWithEngine( sc, engineInstance, // engine, @@ -382,7 +395,7 @@ class MasterActor ( } } -class ServerActor[Q, P]( +class PredictionServer[Q, P]( val args: ServerConfig, val engineInstance: EngineInstance, val engine: Engine[_, _, _, Q, P, _], @@ -393,23 +406,22 @@ class ServerActor[Q, P]( val algorithmsParams: Seq[Params], val models: Seq[Any], val serving: BaseServing[Q, P], - val servingParams: Params) extends Actor with HttpService with KeyAuthentication { + val servingParams: Params, + val system: ActorSystem) extends KeyAuthentication { + + val log = Logging(system, getClass) val serverStartTime = DateTime.now - val log = Logging(context.system, this) var requestCount: Int = 0 var avgServingSec: Double = 0.0 var lastServingSec: Double = 0.0 - /** The following is required by HttpService */ - def actorRefFactory: ActorContext = context - implicit val timeout = Timeout(5, TimeUnit.SECONDS) + val pluginsActorRef = - context.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor") - val pluginContext = EngineServerPluginContext(log, args.engineVariant) + system.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor") - def receive: Actor.Receive = runRoute(myRoute) + val pluginContext = EngineServerPluginContext(log, args.engineVariant) val feedbackEnabled = if (args.feedback) { if (args.accessKey.isEmpty) { @@ -433,37 +445,44 @@ class ServerActor[Q, P]( } } - val myRoute = - path("") { - get { - respondWithMediaType(`text/html`) { - detach() { - complete { - html.index( - args, - engineInstance, - algorithms.map(_.toString), - algorithmsParams.map(_.toString), - models.map(_.toString), - dataSourceParams.toString, - preparatorParams.toString, - servingParams.toString, - serverStartTime, - feedbackEnabled, - args.eventServerIp, - args.eventServerPort, - requestCount, - avgServingSec, - lastServingSec - ).toString - } - } - } + def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]): + AuthenticationDirective[T] = { + extractRequestContext.flatMap { requestContext => + onSuccess(authenticator(requestContext)).flatMap { + case Right(x) => provide(x) + case Left(x) => reject(x): Directive1[T] } - } ~ - path("queries.json") { - post { - detach() { + } + } + + def createRoute(): Route = { + val myRoute = + path("") { + get { + complete(HttpResponse(entity = HttpEntity( + `text/html(UTF-8)`, + html.index( + args, + engineInstance, + algorithms.map(_.toString), + algorithmsParams.map(_.toString), + models.map(_.toString), + dataSourceParams.toString, + preparatorParams.toString, + servingParams.toString, + serverStartTime, + feedbackEnabled, + args.eventServerIp, + args.eventServerPort, + requestCount, + avgServingSec, + lastServingSec + ).toString + ))) + } + } ~ + path("queries.json") { + post { entity(as[String]) { queryString => try { val servingStartTime = DateTime.now @@ -584,9 +603,8 @@ class ServerActor[Q, P]( (requestCount + 1) requestCount += 1 - respondWithMediaType(`application/json`) { - complete(compact(render(pluginResult))) - } + complete(compact(render(pluginResult))) + } catch { case e: MappingException => val msg = s"Query:\n$queryString\n\nStack Trace:\n" + @@ -613,83 +631,76 @@ class ServerActor[Q, P]( } } } - } - } ~ - path("reload") { - authenticate(withAccessKeyFromFile) { request => - post { - complete { - context.actorSelection("/user/master") ! ReloadServer() - "Reloading..." + } ~ + path("reload") { + authenticate(withAccessKeyFromFile) { request => + post { + system.actorSelection("/user/master") ! ReloadServer() + complete("Reloading...") } } - } - } ~ - path("stop") { - authenticate(withAccessKeyFromFile) { request => - post { - complete { - context.system.scheduler.scheduleOnce(1.seconds) { - context.actorSelection("/user/master") ! StopServer() + } ~ + path("stop") { + authenticate(withAccessKeyFromFile) { request => + post { + system.scheduler.scheduleOnce(1.seconds) { + system.actorSelection("/user/master") ! StopServer() } - "Shutting down..." + complete("Shutting down...") } } - } - } ~ - pathPrefix("assets") { - getFromResourceDirectory("assets") - } ~ - path("plugins.json") { - import EngineServerJson4sSupport._ - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete { + } ~ + pathPrefix("assets") { + getFromResourceDirectory("assets") + } ~ + path("plugins.json") { + import EngineServerJson4sSupport._ + get { + complete( Map("plugins" -> Map( "outputblockers" -> pluginContext.outputBlockers.map { case (n, p) => n -> Map( - "name" -> p.pluginName, + "name" -> p.pluginName, "description" -> p.pluginDescription, - "class" -> p.getClass.getName, - "params" -> pluginContext.pluginParams(p.pluginName)) + "class" -> p.getClass.getName, + "params" -> pluginContext.pluginParams(p.pluginName)) }, "outputsniffers" -> pluginContext.outputSniffers.map { case (n, p) => n -> Map( - "name" -> p.pluginName, + "name" -> p.pluginName, "description" -> p.pluginDescription, - "class" -> p.getClass.getName, - "params" -> pluginContext.pluginParams(p.pluginName)) + "class" -> p.getClass.getName, + "params" -> pluginContext.pluginParams(p.pluginName)) } )) - } + ) } - } - } ~ - path("plugins" / Segments) { segments => - import EngineServerJson4sSupport._ - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete { - val pluginArgs = segments.drop(2) - val pluginType = segments(0) - val pluginName = segments(1) - pluginType match { - case EngineServerPlugin.outputBlocker => - pluginContext.outputBlockers(pluginName).handleREST( - pluginArgs) - case EngineServerPlugin.outputSniffer => - pluginsActorRef ? PluginsActor.HandleREST( - pluginName = pluginName, - pluginArgs = pluginArgs) map { - _.asInstanceOf[String] - } - } + } ~ + path("plugins" / Segments) { segments => + import EngineServerJson4sSupport._ + get { + val pluginArgs = segments.drop(2) + val pluginType = segments(0) + val pluginName = segments(1) + pluginType match { + case EngineServerPlugin.outputBlocker => + complete(HttpResponse(entity = HttpEntity( + `application/json`, + pluginContext.outputBlockers(pluginName).handleREST(pluginArgs)))) + + case EngineServerPlugin.outputSniffer => + complete(pluginsActorRef ? PluginsActor.HandleREST( + pluginName = pluginName, + pluginArgs = pluginArgs) map { json => + HttpResponse(entity = HttpEntity( + `application/json`, + json.asInstanceOf[String] + )) + }) } } } - } -} -object EngineServerJson4sSupport extends Json4sSupport { - implicit def json4sFormats: Formats = DefaultFormats + myRoute + } } diff --git a/data/build.sbt b/data/build.sbt index 87b0d96a2..23dff1d5e 100644 --- a/data/build.sbt +++ b/data/build.sbt @@ -22,12 +22,9 @@ name := "apache-predictionio-data" libraryDependencies ++= Seq( "com.github.nscala-time" %% "nscala-time" % "2.6.0", "com.google.guava" % "guava" % "14.0.1", - "io.spray" %% "spray-can" % "1.3.3", - "io.spray" %% "spray-routing" % "1.3.3", - "io.spray" %% "spray-testkit" % "1.3.3" % "test", + "com.typesafe.akka" %% "akka-http-testkit" % "10.1.5" % "test", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.json4s" %% "json4s-native" % json4sVersion.value, "org.scalatest" %% "scalatest" % "2.1.7" % "test", "org.specs2" %% "specs2" % "3.3.1" % "test" exclude("org.scalaz.stream", s"scalaz-stream_${scalaBinaryVersion.value}"), diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala index 60efea246..02355ceba 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala @@ -15,68 +15,58 @@ * limitations under the License. */ - package org.apache.predictionio.data.api -import org.apache.predictionio.data.webhooks.ConnectorException +import akka.http.scaladsl.server._ import org.apache.predictionio.data.storage.StorageException - -import spray.routing._ -import spray.routing.Directives._ -import spray.routing.Rejection -import spray.http.StatusCodes -import spray.httpx.Json4sSupport - -import org.json4s.Formats -import org.json4s.DefaultFormats +import org.apache.predictionio.data.webhooks.ConnectorException +import org.json4s.{DefaultFormats, Formats} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import org.apache.predictionio.akkahttpjson4s.Json4sSupport._ object Common { - object Json4sProtocol extends Json4sSupport { + object Json4sProtocol { + implicit val serialization = org.json4s.native.Serialization implicit def json4sFormats: Formats = DefaultFormats } import Json4sProtocol._ - val rejectionHandler = RejectionHandler { - case MalformedRequestContentRejection(msg, _) :: _ => + val exceptionHandler = ExceptionHandler { + case e: ConnectorException => { + complete(StatusCodes.BadRequest, Map("message" -> s"${e.getMessage()}")) + } + case e: StorageException => { + complete(StatusCodes.InternalServerError, Map("message" -> s"${e.getMessage()}")) + } + case e: Exception => { + complete(StatusCodes.InternalServerError, Map("message" -> s"${e.getMessage()}")) + } + } + + val rejectionHandler = RejectionHandler.newBuilder().handle { + case MalformedRequestContentRejection(msg, _) => complete(StatusCodes.BadRequest, Map("message" -> msg)) - case MissingQueryParamRejection(msg) :: _ => + + case MissingQueryParamRejection(msg) => complete(StatusCodes.NotFound, Map("message" -> s"missing required query parameter ${msg}.")) - case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => { + + case AuthenticationFailedRejection(cause, challengeHeaders) => { val msg = cause match { case AuthenticationFailedRejection.CredentialsRejected => "Invalid accessKey." case AuthenticationFailedRejection.CredentialsMissing => "Missing accessKey." } - complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg)) - } - case ChannelRejection(msg) :: _ => complete(StatusCodes.Unauthorized, Map("message" -> msg)) - case NonExistentAppRejection(msg) :: _ => - complete(StatusCodes.Unauthorized, Map("message" -> msg)) - } - - val exceptionHandler = ExceptionHandler { - case e: ConnectorException => { - val msg = s"${e.getMessage()}" - complete(StatusCodes.BadRequest, Map("message" -> msg)) } - case e: StorageException => { - val msg = s"${e.getMessage()}" - complete(StatusCodes.InternalServerError, Map("message" -> msg)) - } - case e: Exception => { - val msg = s"${e.getMessage()}" - complete(StatusCodes.InternalServerError, Map("message" -> msg)) - } - } + case ChannelRejection(msg) => + complete(StatusCodes.Unauthorized, Map("message" -> msg)) + }.result() } /** invalid channel */ case class ChannelRejection(msg: String) extends Rejection - -/** the app doesn't exist */ -case class NonExistentAppRejection(msg: String) extends Rejection diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala index 41dfefb50..96ff4d076 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala @@ -18,646 +18,543 @@ package org.apache.predictionio.data.api -import akka.event.Logging +import akka.event.{Logging, LoggingAdapter} import sun.misc.BASE64Decoder - import java.util.concurrent.TimeUnit import akka.actor._ -import akka.io.IO +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.{FormData, HttpEntity, HttpResponse, StatusCodes} +import akka.http.scaladsl.model.ContentTypes._ +import akka.http.scaladsl.model.headers.HttpChallenge +import akka.http.scaladsl.server.Directives.complete +import akka.http.scaladsl.server.directives._ +import akka.http.scaladsl.server._ import akka.pattern.ask import akka.util.Timeout -import org.apache.predictionio.data.Utils -import org.apache.predictionio.data.storage.AccessKeys -import org.apache.predictionio.data.storage.Channels -import org.apache.predictionio.data.storage.DateTimeJson4sSupport -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventJson4sSupport -import org.apache.predictionio.data.storage.BatchEventsJson4sSupport -import org.apache.predictionio.data.storage.LEvents -import org.apache.predictionio.data.storage.Storage -import org.json4s.DefaultFormats -import org.json4s.Formats -import org.json4s.JObject -import org.json4s.native.JsonMethods.parse -import spray.can.Http -import spray.http.FormData -import spray.http.MediaTypes -import spray.http.StatusCodes -import spray.httpx.Json4sSupport -import spray.routing._ -import spray.routing.authentication.Authentication - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Try, Success, Failure} - -class EventServiceActor( - val eventClient: LEvents, - val accessKeysClient: AccessKeys, - val channelsClient: Channels, - val config: EventServerConfig) extends HttpServiceActor { - - object Json4sProtocol extends Json4sSupport { - implicit def json4sFormats: Formats = DefaultFormats + - new EventJson4sSupport.APISerializer + - new BatchEventsJson4sSupport.APISerializer + - // NOTE: don't use Json4s JodaTimeSerializers since it has issues, - // some format not converted, or timezone not correct - new DateTimeJson4sSupport.Serializer - } - - - val MaxNumberOfEventsPerBatchRequest = 50 - - val logger = Logging(context.system, this) - - // we use the enclosing ActorContext's or ActorSystem's dispatcher for our - // Futures - implicit def executionContext: ExecutionContext = context.dispatcher - - implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - val rejectionHandler = Common.rejectionHandler +import akka.http.scaladsl.server.Directives._ +import akka.stream.ActorMaterializer +import org.apache.predictionio.data.storage._ +import org.apache.predictionio.akkahttpjson4s.Json4sSupport._ +import org.json4s.{DefaultFormats, Formats, JObject} + +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} + +object Json4sProtocol { + implicit val serialization = org.json4s.native.Serialization + implicit def json4sFormats: Formats = DefaultFormats + + new EventJson4sSupport.APISerializer + + new BatchEventsJson4sSupport.APISerializer + + // NOTE: don't use Json4s JodaTimeSerializers since it has issues, + // some format not converted, or timezone not correct + new DateTimeJson4sSupport.Serializer +} - val jsonPath = """(.+)\.json$""".r - val formPath = """(.+)\.form$""".r +case class EventServerConfig( + ip: String = "localhost", + port: Int = 7070, + plugins: String = "plugins", + stats: Boolean = false) - val pluginContext = EventServerPluginContext(logger) +object EventServer { + import Json4sProtocol._ + import FutureDirectives._ + import Common._ + private val MaxNumberOfEventsPerBatchRequest = 50 private lazy val base64Decoder = new BASE64Decoder + private implicit val timeout = Timeout(5, TimeUnit.SECONDS) + private case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String]) - case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String]) - - /* with accessKey in query/header, return appId if succeed */ - def withAccessKey: RequestContext => Future[Authentication[AuthData]] = { - ctx: RequestContext => - val accessKeyParamOpt = ctx.request.uri.query.get("accessKey") - val channelParamOpt = ctx.request.uri.query.get("channel") - Future { - // with accessKey in query, return appId if succeed - accessKeyParamOpt.map { accessKeyParam => - accessKeysClient.get(accessKeyParam).map { k => - channelParamOpt.map { ch => - val channelMap = - channelsClient.getByAppid(k.appid) - .map(c => (c.name, c.id)).toMap - if (channelMap.contains(ch)) { - Right(AuthData(k.appid, Some(channelMap(ch)), k.events)) - } else { - Left(ChannelRejection(s"Invalid channel '$ch'.")) - } - }.getOrElse{ - Right(AuthData(k.appid, None, k.events)) - } - }.getOrElse(FailedAuth) - }.getOrElse { - // with accessKey in header, return appId if succeed - ctx.request.headers.find(_.name == "Authorization").map { authHeader => - authHeader.value.split("Basic ") match { - case Array(_, value) => - val appAccessKey = - new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0) - accessKeysClient.get(appAccessKey) match { - case Some(k) => Right(AuthData(k.appid, None, k.events)) - case None => FailedAuth - } - - case _ => FailedAuth - } - }.getOrElse(MissedAuth) - } - } - } - - private val FailedAuth = Left( + private def FailedAuth[T]: Either[Rejection, T] = Left( AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, Nil + AuthenticationFailedRejection.CredentialsRejected, HttpChallenge("eventserver", None) ) ) - private val MissedAuth = Left( + private def MissedAuth[T]: Either[Rejection, T] = Left( AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsMissing, Nil + AuthenticationFailedRejection.CredentialsMissing, HttpChallenge("eventserver", None) ) ) - lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor") - lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor") + def createRoute(eventClient: LEvents, + accessKeysClient: AccessKeys, + channelsClient: Channels, + logger: LoggingAdapter, + statsActorRef: ActorSelection, + pluginsActorRef: ActorSelection, + config: EventServerConfig)(implicit executionContext: ExecutionContext): Route = { + + /* with accessKey in query/header, return appId if succeed */ + def withAccessKey: RequestContext => Future[Either[Rejection, AuthData]] = { + ctx: RequestContext => + val accessKeyParamOpt = ctx.request.uri.query().get("accessKey") + val channelParamOpt = ctx.request.uri.query().get("channel") + Future { + // with accessKey in query, return appId if succeed + accessKeyParamOpt.map { accessKeyParam => + accessKeysClient.get(accessKeyParam).map { k => + channelParamOpt.map { ch => + val channelMap = + channelsClient.getByAppid(k.appid) + .map(c => (c.name, c.id)).toMap + if (channelMap.contains(ch)) { + Right(AuthData(k.appid, Some(channelMap(ch)), k.events)) + } else { + Left(ChannelRejection(s"Invalid channel '$ch'.")) + } + }.getOrElse{ + Right(AuthData(k.appid, None, k.events)) + } + }.getOrElse(FailedAuth) + }.getOrElse { + // with accessKey in header, return appId if succeed + ctx.request.headers.find(_.name == "Authorization").map { authHeader => + authHeader.value.split("Basic ") match { + case Array(_, value) => + val appAccessKey = + new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0) + accessKeysClient.get(appAccessKey) match { + case Some(k) => Right(AuthData(k.appid, None, k.events)) + case None => FailedAuth + } - val route: Route = - pathSingleSlash { - import Json4sProtocol._ + case _ => FailedAuth + } + }.getOrElse(MissedAuth) + } + } + } - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete(Map("status" -> "alive")) + def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]): + AuthenticationDirective[T] = { + handleRejections(rejectionHandler).tflatMap { _ => + extractRequestContext.flatMap { requestContext => + onSuccess(authenticator(requestContext)).flatMap { + case Right(x) => provide(x) + case Left(x) => reject(x): Directive1[T] + } } } - } ~ - path("plugins.json") { - import Json4sProtocol._ - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete { + } + + val pluginContext = EventServerPluginContext(logger) + val jsonPath = """(.+)\.json$""".r + val formPath = """(.+)\.form$""".r + + val route: Route = + pathSingleSlash { + get { + complete(Map("status" -> "alive")) + } + } ~ + path("plugins.json") { + get { + complete( Map("plugins" -> Map( "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) => n -> Map( - "name" -> p.pluginName, + "name" -> p.pluginName, "description" -> p.pluginDescription, - "class" -> p.getClass.getName) + "class" -> p.getClass.getName) }, "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) => n -> Map( - "name" -> p.pluginName, + "name" -> p.pluginName, "description" -> p.pluginDescription, - "class" -> p.getClass.getName) + "class" -> p.getClass.getName) } )) - } + ) } - } - } ~ - path("plugins" / Segments) { segments => - get { - handleExceptions(Common.exceptionHandler) { - authenticate(withAccessKey) { authData => - respondWithMediaType(MediaTypes.`application/json`) { - complete { - val pluginArgs = segments.drop(2) - val pluginType = segments(0) - val pluginName = segments(1) - pluginType match { - case EventServerPlugin.inputBlocker => + } ~ + path("plugins" / Segments) { segments => + get { + handleExceptions(exceptionHandler) { + authenticate(withAccessKey) { authData => + val pluginArgs = segments.drop(2) + val pluginType = segments(0) + val pluginName = segments(1) + pluginType match { + case EventServerPlugin.inputBlocker => + complete(HttpResponse(entity = HttpEntity( + `application/json`, pluginContext.inputBlockers(pluginName).handleREST( authData.appId, authData.channelId, pluginArgs) - case EventServerPlugin.inputSniffer => - pluginsActorRef ? PluginsActor.HandleREST( - appId = authData.appId, - channelId = authData.channelId, - pluginName = pluginName, - pluginArgs = pluginArgs) map { - _.asInstanceOf[String] - } - } + ))) + + case EventServerPlugin.inputSniffer => + complete(pluginsActorRef ? PluginsActor.HandleREST( + appId = authData.appId, + channelId = authData.channelId, + pluginName = pluginName, + pluginArgs = pluginArgs) map { json => + HttpResponse(entity = HttpEntity( + `application/json`, + json.asInstanceOf[String] + )) + }) } } } } - } - } ~ - path("events" / jsonPath ) { eventId => - - import Json4sProtocol._ - - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + path("events" / jsonPath ) { eventId => + get { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - logger.debug(s"GET event ${eventId}.") - val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt => - eventOpt.map( event => - (StatusCodes.OK, event) - ).getOrElse( - (StatusCodes.NotFound, Map("message" -> "Not Found")) - ) - } - data - } + logger.debug(s"GET event ${eventId}.") + onSuccess(eventClient.futureGet(eventId, appId, channelId)){ eventOpt => + eventOpt.map { event => + complete(StatusCodes.OK, event) + }.getOrElse( + complete(StatusCodes.NotFound, Map("message" -> "Not Found")) + ) } } } - } - } ~ - delete { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + delete { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - logger.debug(s"DELETE event ${eventId}.") - val data = eventClient.futureDelete(eventId, appId, channelId).map { found => - if (found) { - (StatusCodes.OK, Map("message" -> "Found")) - } else { - (StatusCodes.NotFound, Map("message" -> "Not Found")) - } - } - data + logger.debug(s"DELETE event ${eventId}.") + onSuccess(eventClient.futureDelete(eventId, appId, channelId)){ found => + if (found) { + complete(StatusCodes.OK, Map("message" -> "Found")) + } else { + complete(StatusCodes.NotFound, Map("message" -> "Not Found")) } } } } } - } - } ~ - path("events.json") { - - import Json4sProtocol._ - - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + path("events.json") { + post { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId val events = authData.events entity(as[Event]) { event => - complete { - if (events.isEmpty || authData.events.contains(event.event)) { - pluginContext.inputBlockers.values.foreach( - _.process(EventInfo( - appId = appId, - channelId = channelId, - event = event), pluginContext)) - val data = eventClient.futureInsert(event, appId, channelId).map { id => - pluginsActorRef ! EventInfo( - appId = appId, - channelId = channelId, - event = event) - val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) - if (config.stats) { - statsActorRef ! Bookkeeping(appId, result._1, event) - } - result + if (events.isEmpty || authData.events.contains(event.event)) { + pluginContext.inputBlockers.values.foreach( + _.process(EventInfo( + appId = appId, + channelId = channelId, + event = event), pluginContext)) + onSuccess(eventClient.futureInsert(event, appId, channelId)){ id => + pluginsActorRef ! EventInfo( + appId = appId, + channelId = channelId, + event = event) + val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) + if (config.stats) { + statsActorRef ! Bookkeeping(appId, result._1, event) } - data - } else { - (StatusCodes.Forbidden, - Map("message" -> s"${event.event} events are not allowed")) + complete(result) } + } else { + complete(StatusCodes.Forbidden, + Map("message" -> s"${event.event} events are not allowed")) } } } } - } - } ~ - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + get { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId parameters( - 'startTime.as[Option[String]], - 'untilTime.as[Option[String]], - 'entityType.as[Option[String]], - 'entityId.as[Option[String]], - 'event.as[Option[String]], - 'targetEntityType.as[Option[String]], - 'targetEntityId.as[Option[String]], - 'limit.as[Option[Int]], - 'reversed.as[Option[Boolean]]) { + 'startTime.?, + 'untilTime.?, + 'entityType.?, + 'entityId.?, + 'event.?, + 'targetEntityType.?, + 'targetEntityId.?, + 'limit.as[Int].?, + 'reversed.as[Boolean].?) { (startTimeStr, untilTimeStr, entityType, entityId, - eventName, // only support one event name - targetEntityType, targetEntityId, - limit, reversed) => - respondWithMediaType(MediaTypes.`application/json`) { - complete { - logger.debug( - s"GET events of appId=${appId} " + - s"st=${startTimeStr} ut=${untilTimeStr} " + - s"et=${entityType} eid=${entityId} " + - s"li=${limit} rev=${reversed} ") - - require(!((reversed == Some(true)) - && (entityType.isEmpty || entityId.isEmpty)), - "the parameter reversed can only be used with" + + eventName, // only support one event name + targetEntityType, targetEntityId, + limit, reversed) => + logger.debug( + s"GET events of appId=${appId} " + + s"st=${startTimeStr} ut=${untilTimeStr} " + + s"et=${entityType} eid=${entityId} " + + s"li=${limit} rev=${reversed} ") + + require(!((reversed == Some(true)) + && (entityType.isEmpty || entityId.isEmpty)), + "the parameter reversed can only be used with" + " both entityType and entityId specified.") - val parseTime = Future { - val startTime = startTimeStr.map(Utils.stringToDateTime(_)) - val untilTime = untilTimeStr.map(Utils.stringToDateTime(_)) - (startTime, untilTime) - } + val parseTime = Future { + val startTime = startTimeStr.map(Utils.stringToDateTime(_)) + val untilTime = untilTimeStr.map(Utils.stringToDateTime(_)) + (startTime, untilTime) + } - parseTime.flatMap { case (startTime, untilTime) => - val data = eventClient.futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventName.map(List(_)), - targetEntityType = targetEntityType.map(Some(_)), - targetEntityId = targetEntityId.map(Some(_)), - limit = limit.orElse(Some(20)), - reversed = reversed) - .map { eventIter => - if (eventIter.hasNext) { - (StatusCodes.OK, eventIter.toArray) - } else { - (StatusCodes.NotFound, - Map("message" -> "Not Found")) - } + val f = parseTime.flatMap { case (startTime, untilTime) => + val data = eventClient.futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventName.map(List(_)), + targetEntityType = targetEntityType.map(Some(_)), + targetEntityId = targetEntityId.map(Some(_)), + limit = limit.orElse(Some(20)), + reversed = reversed) + .map { eventIter => + if (eventIter.hasNext) { + (StatusCodes.OK, eventIter.toArray) + } else { + (StatusCodes.NotFound, Map("message" -> "Not Found")) } - data - }.recover { - case e: Exception => - (StatusCodes.BadRequest, Map("message" -> s"${e}")) - } + } + data } + + onSuccess(f){ (status, body) => complete(status, body) } } - } } } } - } - } ~ - path("batch" / "events.json") { - - import Json4sProtocol._ - - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + path("batch" / "events.json") { + post { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId val allowedEvents = authData.events entity(as[Seq[Try[Event]]]) { events => - complete { - if (events.length <= MaxNumberOfEventsPerBatchRequest) { - val eventWithIndex = events.zipWithIndex - - val taggedEvents = eventWithIndex.collect { case (Success(event), i) => - if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){ - (Right(event), i) - } else { - (Left(event), i) - } - } + if (events.length <= MaxNumberOfEventsPerBatchRequest) { + val eventWithIndex = events.zipWithIndex - val insertEvents = taggedEvents.collect { case (Right(event), i) => - (event, i) + val taggedEvents = eventWithIndex.collect { case (Success(event), i) => + if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){ + (Right(event), i) + } else { + (Left(event), i) } + } - insertEvents.foreach { case (event, i) => - pluginContext.inputBlockers.values.foreach( - _.process(EventInfo( - appId = appId, - channelId = channelId, - event = event), pluginContext)) - } + val insertEvents = taggedEvents.collect { case (Right(event), i) => + (event, i) + } - val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch( - insertEvents.map(_._1), appId, channelId).map { insertResults => - val results = insertResults.zip(insertEvents).map { case (id, (event, i)) => - pluginsActorRef ! EventInfo( - appId = appId, - channelId = channelId, - event = event) - val status = StatusCodes.Created - if (config.stats) { - statsActorRef ! Bookkeeping(appId, status, event) - } + insertEvents.foreach { case (event, i) => + pluginContext.inputBlockers.values.foreach( + _.process(EventInfo( + appId = appId, + channelId = channelId, + event = event), pluginContext)) + } + + val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch( + insertEvents.map(_._1), appId, channelId).map { insertResults => + val results = insertResults.zip(insertEvents).map { case (id, (event, i)) => + pluginsActorRef ! EventInfo( + appId = appId, + channelId = channelId, + event = event) + val status = StatusCodes.Created + if (config.stats) { + statsActorRef ! Bookkeeping(appId, status, event) + } + (Map( + "status" -> status.intValue, + "eventId" -> s"${id}"), i) + } ++ + // Results of denied events + taggedEvents.collect { case (Left(event), i) => (Map( - "status" -> status.intValue, - "eventId" -> s"${id}"), i) + "status" -> StatusCodes.Forbidden.intValue, + "message" -> s"${event.event} events are not allowed"), i) } ++ - // Results of denied events - taggedEvents.collect { case (Left(event), i) => - (Map( - "status" -> StatusCodes.Forbidden.intValue, - "message" -> s"${event.event} events are not allowed"), i) - } ++ - // Results of failed to deserialze events - eventWithIndex.collect { case (Failure(exception), i) => - (Map( - "status" -> StatusCodes.BadRequest.intValue, - "message" -> s"${exception.getMessage()}"), i) - } + // Results of failed to deserialze events + eventWithIndex.collect { case (Failure(exception), i) => + (Map( + "status" -> StatusCodes.BadRequest.intValue, + "message" -> s"${exception.getMessage()}"), i) + } - // Restore original order - results.sortBy { case (_, i) => i }.map { case (data, _) => data } - } + // Restore original order + results.sortBy { case (_, i) => i }.map { case (data, _) => data } + } - f.recover { case exception => - Map( - "status" -> StatusCodes.InternalServerError.intValue, - "message" -> s"${exception.getMessage()}") - } + onSuccess(f.recover { case exception => + Map( + "status" -> StatusCodes.InternalServerError.intValue, + "message" -> s"${exception.getMessage()}" + ) + }){ res => complete(res) } - } else { - (StatusCodes.BadRequest, - Map("message" -> (s"Batch request must have less than or equal to " + - s"${MaxNumberOfEventsPerBatchRequest} events"))) - } + } else { + complete(StatusCodes.BadRequest, + Map("message" -> (s"Batch request must have less than or equal to " + + s"${MaxNumberOfEventsPerBatchRequest} events"))) } } } } } - } - } ~ - path("stats.json") { - - import Json4sProtocol._ - - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + path("stats.json") { + get { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId - respondWithMediaType(MediaTypes.`application/json`) { - if (config.stats) { - complete { - statsActorRef ? GetStats(appId) map { - _.asInstanceOf[Map[String, StatsSnapshot]] - } + if (config.stats) { + complete { + statsActorRef ? GetStats(appId) map { + _.asInstanceOf[Map[String, StatsSnapshot]] } - } else { - complete( - StatusCodes.NotFound, - parse("""{"message": "To see stats, launch Event Server """ + - """with --stats argument."}""")) } + } else { + complete( + StatusCodes.NotFound, + Map("message" -> "To see stats, launch Event Server with --stats argument.") + ) } } } - } - } // stats.json get - } ~ - path("webhooks" / jsonPath ) { web => - import Json4sProtocol._ - - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } // stats.json get + } ~ + path("webhooks" / jsonPath ) { web => + post { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - entity(as[JObject]) { jObj => - complete { - Webhooks.postJson( - appId = appId, - channelId = channelId, - web = web, - data = jObj, - eventClient = eventClient, - log = logger, - stats = config.stats, - statsActorRef = statsActorRef) - } + entity(as[JObject]) { jObj => + onSuccess(Webhooks.postJson( + appId = appId, + channelId = channelId, + web = web, + data = jObj, + eventClient = eventClient, + log = logger, + stats = config.stats, + statsActorRef = statsActorRef + )){ + (status, body) => complete(status, body) } } } } - } - } ~ - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + get { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - Webhooks.getJson( - appId = appId, - channelId = channelId, - web = web, - log = logger) - } + onSuccess( + Webhooks.getJson( + appId = appId, + channelId = channelId, + web = web, + log = logger) + ){ + (status, body) => complete(status, body) } } } } - } - } ~ - path("webhooks" / formPath ) { web => - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + path("webhooks" / formPath ) { web => + post { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - entity(as[FormData]){ formData => - // logger.debug(formData.toString) - complete { - // respond with JSON - import Json4sProtocol._ - - Webhooks.postForm( - appId = appId, - channelId = channelId, - web = web, - data = formData, - eventClient = eventClient, - log = logger, - stats = config.stats, - statsActorRef = statsActorRef) - } + entity(as[FormData]){ formData => + logger.debug(formData.toString) + onSuccess(Webhooks.postForm( + appId = appId, + channelId = channelId, + web = web, + data = formData, + eventClient = eventClient, + log = logger, + stats = config.stats, + statsActorRef = statsActorRef + )){ + (status, body) => complete(status, body) } } } } - } - } ~ - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { + } ~ + get { + handleExceptions(exceptionHandler) { authenticate(withAccessKey) { authData => val appId = authData.appId val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - // respond with JSON - import Json4sProtocol._ - - Webhooks.getForm( - appId = appId, - channelId = channelId, - web = web, - log = logger) - } + onSuccess(Webhooks.getForm( + appId = appId, + channelId = channelId, + web = web, + log = logger + )){ + (status, body) => complete(status, body) } } } } } - } - - def receive: Actor.Receive = runRoute(route) -} - - - -/* message */ -case class StartServer(host: String, port: Int) - -class EventServerActor( - val eventClient: LEvents, - val accessKeysClient: AccessKeys, - val channelsClient: Channels, - val config: EventServerConfig) extends Actor with ActorLogging { - val child = context.actorOf( - Props(classOf[EventServiceActor], - eventClient, - accessKeysClient, - channelsClient, - config), - "EventServiceActor") - implicit val system = context.system - - def receive: Actor.Receive = { - case StartServer(host, portNum) => { - IO(Http) ! Http.Bind(child, interface = host, port = portNum) - } - case m: Http.Bound => log.info("Bound received. EventServer is ready.") - case m: Http.CommandFailed => log.error("Command failed.") - case _ => log.error("Unknown message.") + route } -} - -case class EventServerConfig( - ip: String = "localhost", - port: Int = 7070, - plugins: String = "plugins", - stats: Boolean = false) -object EventServer { def createEventServer(config: EventServerConfig): ActorSystem = { implicit val system = ActorSystem("EventServerSystem") + implicit val materializer = ActorMaterializer() + implicit val executionContext = system.dispatcher val eventClient = Storage.getLEvents() val accessKeysClient = Storage.getMetaDataAccessKeys() val channelsClient = Storage.getMetaDataChannels() - val serverActor = system.actorOf( - Props( - classOf[EventServerActor], - eventClient, - accessKeysClient, - channelsClient, - config), - "EventServerActor" - ) - if (config.stats) system.actorOf(Props[StatsActor], "StatsActor") - system.actorOf(Props[PluginsActor], "PluginsActor") - serverActor ! StartServer(config.ip, config.port) + val statsActorRef = system.actorSelection("/user/StatsActor") + val pluginsActorRef = system.actorSelection("/user/PluginsActor") + + val logger = Logging(system, getClass) + + val route = createRoute(eventClient, accessKeysClient, channelsClient, + logger, statsActorRef, pluginsActorRef, config) + + Http().bindAndHandle(route, config.ip, config.port) + system } } object Run { def main(args: Array[String]): Unit = { - EventServer.createEventServer(EventServerConfig( + val f = EventServer.createEventServer(EventServerConfig( ip = "0.0.0.0", port = 7070)) - .awaitTermination + .whenTerminated + + Await.ready(f, Duration.Inf) } } diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala index 9bbbc2e50..d544b1bcc 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala @@ -18,13 +18,11 @@ package org.apache.predictionio.data.api +import akka.http.scaladsl.model.StatusCode import org.apache.predictionio.data.storage.Event -import spray.http.StatusCode - -import scala.collection.mutable.{ HashMap => MHashMap } +import scala.collection.mutable.{HashMap => MHashMap} import scala.collection.mutable - import com.github.nscala_time.time.Imports.DateTime case class EntityTypesEvent( diff --git a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala index aa9438b57..627d046b1 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala @@ -18,10 +18,9 @@ package org.apache.predictionio.data.api +import akka.http.scaladsl.model.StatusCode import org.apache.predictionio.data.storage.Event -import spray.http.StatusCode - import akka.actor.Actor import akka.event.Logging diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala index 57be03785..e9a9c534e 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala @@ -18,13 +18,10 @@ package org.apache.predictionio.data.api +import akka.http.scaladsl.model.{FormData, StatusCode, StatusCodes} import org.apache.predictionio.data.webhooks.ConnectorUtil import org.apache.predictionio.data.storage.LEvents -import spray.http.StatusCodes -import spray.http.StatusCode -import spray.http.FormData - import org.json4s.JObject import akka.event.LoggingAdapter diff --git a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala index 7a45ca112..24cebc75b 100644 --- a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala +++ b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala @@ -18,55 +18,39 @@ package org.apache.predictionio.data.api -import org.apache.predictionio.data.storage.{Storage, StorageMockContext} -import akka.testkit.TestProbe -import akka.actor.{ActorRef, ActorSystem, Props} -import spray.http.HttpEntity -import spray.http.HttpResponse -import spray.http.ContentTypes -import spray.httpx.RequestBuilding.Get +import akka.event.Logging +import org.apache.predictionio.data.storage.Storage import org.specs2.mutable.Specification +import akka.http.scaladsl.testkit.Specs2RouteTest -class EventServiceSpec extends Specification { - val system = ActorSystem("EventServiceSpecSystem") +class EventServiceSpec extends Specification with Specs2RouteTest { + val eventClient = Storage.getLEvents() + val accessKeysClient = Storage.getMetaDataAccessKeys() + val channelsClient = Storage.getMetaDataChannels() - def createEventServiceActor: ActorRef = { - val eventClient = Storage.getLEvents() - val accessKeysClient = Storage.getMetaDataAccessKeys() - val channelsClient = Storage.getMetaDataChannels() + val statsActorRef = system.actorSelection("/user/StatsActor") + val pluginsActorRef = system.actorSelection("/user/PluginsActor") - system.actorOf( - Props( - new EventServiceActor( - eventClient, - accessKeysClient, - channelsClient, - EventServerConfig() - ) - ) - ) - } + val logger = Logging(system, getClass) + val config = EventServerConfig(ip = "0.0.0.0", port = 7070) + val route = EventServer.createRoute( + eventClient, + accessKeysClient, + channelsClient, + logger, + statsActorRef, + pluginsActorRef, + config + ) "GET / request" should { - "properly produce OK HttpResponses" in new StorageMockContext { - Thread.sleep(2000) - val eventServiceActor = createEventServiceActor - val probe = TestProbe()(system) - probe.send(eventServiceActor, Get("/")) - probe.expectMsg( - HttpResponse( - 200, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"status":"alive"}""" - ) - ) - ) - success + "properly produce OK HttpResponses" in { + Get() ~> route ~> check { + status.intValue() shouldEqual 200 + responseAs[String] shouldEqual """{"status":"alive"}""" + } } } - - step(system.shutdown()) } diff --git a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala index 592782451..297c25f86 100644 --- a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala +++ b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala @@ -17,22 +17,20 @@ package org.apache.predictionio.data.api -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.TestProbe +import akka.event.Logging +import akka.http.scaladsl.model.ContentTypes +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.server.Route import org.apache.predictionio.data.storage._ import org.joda.time.DateTime -import org.scalamock.specs2.MockContext import org.specs2.mutable.Specification -import spray.http.HttpHeaders.RawHeader -import spray.http.{ContentTypes, HttpEntity, HttpResponse} -import spray.httpx.RequestBuilding._ import sun.misc.BASE64Encoder +import akka.http.scaladsl.testkit.Specs2RouteTest import scala.concurrent.{ExecutionContext, Future} -class SegmentIOAuthSpec extends Specification { +class SegmentIOAuthSpec extends Specification with Specs2RouteTest { - val system = ActorSystem("EventServiceSpecSystem") sequential isolated val eventClient = new LEvents { @@ -74,75 +72,51 @@ class SegmentIOAuthSpec extends Specification { override def get(k: String): Option[AccessKey] = k match { - case "abc" ⇒ Some(AccessKey(k, appId, Seq.empty)) - case _ ⇒ None + case "abc" => Some(AccessKey(k, appId, Seq.empty)) + case _ => None } } - val base64Encoder = new BASE64Encoder + val channelsClient = Storage.getMetaDataChannels() - def createEventServiceActor(): ActorRef = { - val channelsClient = Storage.getMetaDataChannels() - system.actorOf( - Props( - new EventServiceActor( - eventClient, - accessKeysClient, - channelsClient, - EventServerConfig() - ) - ) - ) - } + val statsActorRef = system.actorSelection("/user/StatsActor") + val pluginsActorRef = system.actorSelection("/user/PluginsActor") - "Event Service" should { + val base64Encoder = new BASE64Encoder + val logger = Logging(system, getClass) + val config = EventServerConfig(ip = "0.0.0.0", port = 7070) + + val route = EventServer.createRoute( + eventClient, + accessKeysClient, + channelsClient, + logger, + statsActorRef, + pluginsActorRef, + config + ) + "Event Service" should { "reject with CredentialsRejected with invalid credentials" in new StorageMockContext { - val eventServiceActor = createEventServiceActor val accessKey = "abc123:" - val probe = TestProbe()(system) - probe.send( - eventServiceActor, - Post("/webhooks/segmentio.json") - .withHeaders( - List( - RawHeader("Authorization", s"Basic $accessKey") - ) - ) - ) - probe.expectMsg( - HttpResponse( - 401, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"message":"Invalid accessKey."}""" - ) - ) - ) + Post("/webhooks/segmentio.json") + .withHeaders(RawHeader("Authorization", s"Basic $accessKey")) ~> Route.seal(route) ~> check { + status.intValue() shouldEqual 401 + responseAs[String] shouldEqual """{"message":"Invalid accessKey."}""" + } success } + } "reject with CredentialsMissed without credentials" in { - val eventServiceActor = createEventServiceActor - val probe = TestProbe()(system) - probe.send( - eventServiceActor, - Post("/webhooks/segmentio.json") - ) - probe.expectMsg( - HttpResponse( - 401, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"message":"Missing accessKey."}""" - ) - ) - ) + Post("/webhooks/segmentio.json") ~> Route.seal(route) ~> check { + status.intValue() shouldEqual 401 + responseAs[String] shouldEqual """{"message":"Missing accessKey."}""" + } success } "process SegmentIO identity request properly" in { - val eventServiceActor = createEventServiceActor val jsonReq = """ |{ @@ -169,32 +143,15 @@ class SegmentIOAuthSpec extends Specification { val accessKey = "abc:" val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes) - val probe = TestProbe()(system) - probe.send( - eventServiceActor, - Post( - "/webhooks/segmentio.json", - HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes) - ).withHeaders( - List( - RawHeader("Authorization", s"Basic $accessKeyEncoded") - ) - ) - ) - probe.expectMsg( - HttpResponse( - 201, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"eventId":"event_id"}""" - ) - ) - ) + Post("/webhooks/segmentio.json") + .withHeaders(RawHeader("Authorization", s"Basic $accessKeyEncoded")) + .withEntity(ContentTypes.`application/json`, jsonReq) ~> route ~> check { + println(responseAs[String]) + status.intValue() shouldEqual 201 + responseAs[String] shouldEqual """{"eventId":"event_id"}""" + } success - } } - - step(system.shutdown()) } diff --git a/docs/manual/source/datacollection/eventapi.html.md b/docs/manual/source/datacollection/eventapi.html.md index dea378238..be4b04073 100644 --- a/docs/manual/source/datacollection/eventapi.html.md +++ b/docs/manual/source/datacollection/eventapi.html.md @@ -67,7 +67,7 @@ Sample response: ``` HTTP/1.1 200 OK -Server: spray-can/1.2.1 +Server: akka-http/10.1.5 Date: Wed, 10 Sep 2014 22:37:30 GMT Content-Type: application/json; charset=UTF-8 Content-Length: 18 @@ -284,7 +284,7 @@ Sample response: ``` HTTP/1.1 201 Created -Server: spray-can/1.2.1 +Server: akka-http/10.1.5 Date: Wed, 10 Sep 2014 22:51:33 GMT Content-Type: application/json; charset=UTF-8 Content-Length: 41 diff --git a/docs/manual/source/datacollection/eventmodel.html.md.erb b/docs/manual/source/datacollection/eventmodel.html.md.erb index ec8e5a889..676de5447 100644 --- a/docs/manual/source/datacollection/eventmodel.html.md.erb +++ b/docs/manual/source/datacollection/eventmodel.html.md.erb @@ -139,7 +139,7 @@ You should see something like the following, meaning the events are imported suc ``` HTTP/1.1 201 Created -Server: spray-can/1.3.2 +Server: akka-http/10.1.5 Date: Tue, 02 Jun 2015 23:13:58 GMT Content-Type: application/json; charset=UTF-8 Content-Length: 57 @@ -339,7 +339,7 @@ The order in the response array is corresponding to the order of the request arr ###Sample Response: HTTP/1.1 200 Successful - Server: spray-can/1.2.1 + Server: akka-http/10.1.5 Date: Wed, 10 Sep 2014 22:51:33 GMT Content-Type: application/json; charset=UTF-8 Content-Length: 41 diff --git a/docs/manual/source/deploy/monitoring.html.md b/docs/manual/source/deploy/monitoring.html.md index 191898b67..886b0015b 100644 --- a/docs/manual/source/deploy/monitoring.html.md +++ b/docs/manual/source/deploy/monitoring.html.md @@ -119,7 +119,7 @@ Be sure to adjust your deploy command to your environment (driver-memry, postgre exit 0 ``` -There can be cases when the process is running but the engine is down however. If the spray REST API used by PredictionIO crashes, the engine process continues but the engine to fail when queried. +There can be cases when the process is running but the engine is down however. If the Akka HTTP REST API used by PredictionIO crashes, the engine process continues but the engine to fail when queried. This sort of crash can be taken care of by using monits `check program` capability. diff --git a/docs/manual/source/index.html.md.erb b/docs/manual/source/index.html.md.erb index fb715f6da..428927845 100644 --- a/docs/manual/source/index.html.md.erb +++ b/docs/manual/source/index.html.md.erb @@ -41,7 +41,7 @@ scientists to create predictive engines for any machine learning task. It lets y * simplify data infrastructure management. Apache PredictionIO® can be [installed](/install/) as a full machine -learning stack, bundled with **Apache Spark**, **MLlib**, **HBase**, **Spray** +learning stack, bundled with **Apache Spark**, **MLlib**, **HBase**, **Akka HTTP** and **Elasticsearch**, which simplifies and accelerates scalable machine learning infrastructure management. diff --git a/tools/build.sbt b/tools/build.sbt index 9375f2aff..acdb1fe4a 100644 --- a/tools/build.sbt +++ b/tools/build.sbt @@ -21,11 +21,11 @@ import sbtassembly.AssemblyPlugin.autoImport._ name := "apache-predictionio-tools" libraryDependencies ++= Seq( - "com.github.zafarkhaja" % "java-semver" % "0.9.0", - "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion.value, - "io.spray" %% "spray-testkit" % "1.3.3" % "test", - "org.specs2" %% "specs2" % "2.3.13" % "test") + "com.github.zafarkhaja" % "java-semver" % "0.9.0", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion.value, + "com.typesafe.akka" %% "akka-http-testkit" % "10.1.5" % "test", + "org.specs2" %% "specs2-core" % "4.2.0" % "test") assemblyMergeStrategy in assembly := { case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala index 7e8fd300e..d8bb79f0f 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala @@ -14,148 +14,116 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.predictionio.tools.admin -import akka.actor.{Actor, ActorSystem, Props} -import akka.event.Logging -import akka.io.IO -import akka.util.Timeout -import org.apache.predictionio.data.api.StartServer -import org.apache.predictionio.data.storage.Storage -import org.json4s.{Formats, DefaultFormats} - import java.util.concurrent.TimeUnit -import spray.can.Http -import spray.http.{MediaTypes, StatusCodes} -import spray.httpx.Json4sSupport -import spray.routing._ +import akka.http.scaladsl.server._ +import org.apache.predictionio.data.storage._ -import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.http.scaladsl.Http +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.util.Timeout +import org.apache.predictionio.akkahttpjson4s.Json4sSupport._ +import org.json4s.{DefaultFormats, Formats} -class AdminServiceActor(val commandClient: CommandClient) - extends HttpServiceActor { +object Json4sProtocol { + implicit val serialization = org.json4s.jackson.Serialization + implicit def json4sFormats: Formats = DefaultFormats +} - object Json4sProtocol extends Json4sSupport { - implicit def json4sFormats: Formats = DefaultFormats - } +case class AdminServerConfig( + ip: String = "localhost", + port: Int = 7071 +) +object AdminServer { import Json4sProtocol._ - val log = Logging(context.system, this) - - // we use the enclosing ActorContext's or ActorSystem's dispatcher for our - // Futures - implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher - implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS) + private implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS) // for better message response - val rejectionHandler = RejectionHandler { - case MalformedRequestContentRejection(msg, _) :: _ => + private val rejectionHandler = RejectionHandler.newBuilder().handle { + case MalformedRequestContentRejection(msg, _) => complete(StatusCodes.BadRequest, Map("message" -> msg)) - case MissingQueryParamRejection(msg) :: _ => + case MissingQueryParamRejection(msg) => complete(StatusCodes.NotFound, Map("message" -> s"missing required query parameter ${msg}.")) - case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => + case AuthenticationFailedRejection(cause, challengeHeaders) => complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> s"Invalid accessKey.")) - } + }.result() - val jsonPath = """(.+)\.json$""".r + def createRoute()(implicit executionContext: ExecutionContext): Route = { + + val commandClient = new CommandClient( + appClient = Storage.getMetaDataApps, + accessKeyClient = Storage.getMetaDataAccessKeys, + eventClient = Storage.getLEvents() + ) - val route: Route = - pathSingleSlash { - get { - respondWithMediaType(MediaTypes.`application/json`) { + val route = + pathSingleSlash { + get { complete(Map("status" -> "alive")) } - } - } ~ + } ~ path("cmd" / "app" / Segment / "data") { appName => { delete { - respondWithMediaType(MediaTypes.`application/json`) { - complete(commandClient.futureAppDataDelete(appName)) - } + complete(commandClient.futureAppDataDelete(appName)) } } } ~ path("cmd" / "app" / Segment) { appName => { delete { - respondWithMediaType(MediaTypes.`application/json`) { - complete(commandClient.futureAppDelete(appName)) - } + complete(commandClient.futureAppDelete(appName)) } } } ~ path("cmd" / "app") { get { - respondWithMediaType(MediaTypes.`application/json`) { - complete(commandClient.futureAppList()) - } + complete(commandClient.futureAppList()) } ~ - post { - entity(as[AppRequest]) { - appArgs => respondWithMediaType(MediaTypes.`application/json`) { - complete(commandClient.futureAppNew(appArgs)) + post { + entity(as[AppRequest]) { + appArgs => + onSuccess(commandClient.futureAppNew(appArgs)){ + case res: GeneralResponse => complete(res) + case res: AppNewResponse => complete(res) } - } } + } } - def receive: Actor.Receive = runRoute(route) -} - -class AdminServerActor(val commandClient: CommandClient) extends Actor { - val log = Logging(context.system, this) - val child = context.actorOf( - Props(classOf[AdminServiceActor], commandClient), - "AdminServiceActor") - implicit val system = context.system - - def receive: PartialFunction[Any, Unit] = { - case StartServer(host, portNum) => { - IO(Http) ! Http.Bind(child, interface = host, port = portNum) - - } - case m: Http.Bound => log.info("Bound received. AdminServer is ready.") - case m: Http.CommandFailed => log.error("Command failed.") - case _ => log.error("Unknown message.") + route } -} -case class AdminServerConfig( - ip: String = "localhost", - port: Int = 7071 -) -object AdminServer { def createAdminServer(config: AdminServerConfig): ActorSystem = { implicit val system = ActorSystem("AdminServerSystem") + implicit val materializer = ActorMaterializer() + implicit val executionContext = system.dispatcher - val commandClient = new CommandClient( - appClient = Storage.getMetaDataApps, - accessKeyClient = Storage.getMetaDataAccessKeys, - eventClient = Storage.getLEvents() - ) - - val serverActor = system.actorOf( - Props(classOf[AdminServerActor], commandClient), - "AdminServerActor") - serverActor ! StartServer(config.ip, config.port) + val route = createRoute() + Http().bindAndHandle(route, config.ip, config.port) system } } object AdminRun { def main (args: Array[String]) : Unit = { - AdminServer.createAdminServer(AdminServerConfig( + val f = AdminServer.createAdminServer(AdminServerConfig( ip = "localhost", port = 7071)) - .awaitTermination + .whenTerminated + + Await.ready(f, Duration.Inf) } } diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md index 666b5720f..79446650f 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md @@ -26,7 +26,7 @@ $ sbt/sbt "tools/compile" $ set -a $ source conf/pio-env.sh $ set +a -$ sbt/sbt "tools/run-main org.apache.predictionio.tools.admin.AdminRun" +$ sbt/sbt "tools/runMain org.apache.predictionio.tools.admin.AdminRun" ``` ### Unit test (Very minimal) @@ -35,7 +35,7 @@ $ sbt/sbt "tools/run-main org.apache.predictionio.tools.admin.AdminRun" $ set -a $ source conf/pio-env.sh $ set +a -$ sbt/sbt "tools/test-only org.apache.predictionio.tools.admin.AdminAPISpec" +$ sbt/sbt "tools/testOnly org.apache.predictionio.tools.admin.AdminAPISpec" ``` ### Start with pio command adminserver diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala index 8c4c6ae63..54f5d3f78 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala @@ -25,7 +25,7 @@ import org.apache.predictionio.tools.EventServerArgs import org.apache.predictionio.tools.EitherLogging import org.apache.predictionio.tools.Common import org.apache.predictionio.tools.ReturnTypes._ -import org.apache.predictionio.tools.dashboard.Dashboard +import org.apache.predictionio.tools.dashboard.DashboardServer import org.apache.predictionio.tools.dashboard.DashboardConfig import org.apache.predictionio.tools.admin.AdminServer import org.apache.predictionio.tools.admin.AdminServerConfig @@ -62,7 +62,7 @@ object Management extends EitherLogging { */ def dashboard(da: DashboardArgs): ActorSystem = { info(s"Creating dashboard at ${da.ip}:${da.port}") - Dashboard.createDashboard(DashboardConfig( + DashboardServer.createDashboard(DashboardConfig( ip = da.ip, port = da.port)) } diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala index ef4581b6f..1b4c8a86e 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala @@ -17,16 +17,13 @@ package org.apache.predictionio.tools.console -import org.apache.predictionio.tools.{ - EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, - DeployArgs, BatchPredictArgs} -import org.apache.predictionio.tools.commands.{ - DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs, - BuildArgs, EngineArgs, Management, Engine, Import, Export, - App => AppCmd, AccessKey => AccessKeysCmd} +import org.apache.predictionio.tools.{BatchPredictArgs, DeployArgs, EventServerArgs, ServerArgs, SparkArgs, WorkflowArgs} +import org.apache.predictionio.tools.commands.{AdminServerArgs, BuildArgs, DashboardArgs, Engine, EngineArgs, Export, ExportArgs, Import, ImportArgs, Management, AccessKey => AccessKeysCmd, App => AppCmd} import org.apache.predictionio.tools.ReturnTypes._ - import grizzled.slf4j.Logging + +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.language.implicitConversions import scala.sys.process._ @@ -116,17 +113,17 @@ object Pio extends Logging { ea, engineInstanceId, batchPredictArgs, sparkArgs, pioHome, verbose)) def dashboard(da: DashboardArgs): Int = { - Management.dashboard(da).awaitTermination + Await.ready(Management.dashboard(da).whenTerminated, Duration.Inf) 0 } def eventserver(ea: EventServerArgs): Int = { - Management.eventserver(ea).awaitTermination + Await.ready(Management.eventserver(ea).whenTerminated, Duration.Inf) 0 } def adminserver(aa: AdminServerArgs): Int = { - Management.adminserver(aa).awaitTermination + Await.ready(Management.adminserver(aa).whenTerminated, Duration.Inf) 0 } diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala index 102699601..0a1031dbd 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala @@ -18,60 +18,33 @@ package org.apache.predictionio.tools.dashboard -// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea - -import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins} -import spray.http.HttpHeaders._ -import spray.http.HttpEntity -import spray.routing._ -import spray.http.StatusCodes -import spray.http.ContentTypes - -// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS -trait CORSSupport { - this: HttpService => - - private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins) - private val optionsCorsHeaders = List( - `Access-Control-Allow-Headers`("""Origin, - |X-Requested-With, - |Content-Type, - |Accept, - |Accept-Encoding, - |Accept-Language, - |Host, - |Referer, - |User-Agent""".stripMargin.replace("\n", " ")), - `Access-Control-Max-Age`(1728000) - ) +// Reference from: https://gist.github.com/jeroenr/5261fa041d592f37cd80 + +import akka.http.scaladsl.model.HttpMethods._ +import akka.http.scaladsl.model.{StatusCodes, HttpResponse} +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.{Directive0, Route} +import com.typesafe.config.ConfigFactory + +trait CorsSupport { + + // this directive adds access control headers to normal responses + private def addAccessControlHeaders: Directive0 = { + respondWithHeaders( + `Access-Control-Allow-Origin`.forRange(HttpOriginRange.`*`), + `Access-Control-Allow-Credentials`(true), + `Access-Control-Allow-Headers`("Authorization", "Content-Type", "X-Requested-With") + ) + } - def cors[T]: Directive0 = mapRequestContext { ctx => - ctx.withRouteResponseHandling { - // OPTION request for a resource that responds to other methods - case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) && - x.exists(_.isInstanceOf[MethodRejection])) => { - val allowedMethods: List[HttpMethod] = x.collect { - case rejection: MethodRejection => rejection.supported - } - ctx.complete { - HttpResponse().withHeaders( - `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) :: - allowOriginHeader :: - optionsCorsHeaders - ) - } - } - }.withHttpResponseHeadersMapped { headers => - allowOriginHeader :: headers - } + // this handles preflight OPTIONS requests. + private def preflightRequestHandler: Route = options { + complete(HttpResponse(StatusCodes.OK) + .withHeaders(`Access-Control-Allow-Methods`(OPTIONS, POST, PUT, GET, DELETE))) } - override def timeoutRoute: StandardRoute = complete { - HttpResponse( - StatusCodes.InternalServerError, - HttpEntity(ContentTypes.`text/plain(UTF-8)`, - "The server was not able to produce a timely response to your request."), - List(allowOriginHeader) - ) + def corsHandler(r: Route): Route = addAccessControlHeaders { + preflightRequestHandler ~ r } } diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala index 7d651b128..ddbf715c8 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala @@ -18,22 +18,23 @@ package org.apache.predictionio.tools.dashboard -import com.typesafe.config.ConfigFactory import org.apache.predictionio.authentication.KeyAuthentication -import org.apache.predictionio.configuration.SSLConfiguration import org.apache.predictionio.data.storage.Storage -import spray.can.server.ServerSettings -import scala.concurrent.ExecutionContext -import akka.actor.{ActorContext, Actor, ActorSystem, Props} -import akka.io.IO -import akka.pattern.ask -import akka.util.Timeout + +import scala.concurrent.{Await, ExecutionContext, Future} +import akka.actor.ActorSystem +import akka.http.scaladsl.server.directives.FutureDirectives.onSuccess import com.github.nscala_time.time.Imports.DateTime import grizzled.slf4j.Logging -import spray.can.Http -import spray.http._ -import spray.http.MediaTypes._ -import spray.routing._ +import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.directives._ +import akka.http.scaladsl.server._ +import akka.stream.ActorMaterializer +import akka.http.scaladsl.model.ContentTypes._ +import com.typesafe.config.ConfigFactory +import org.apache.predictionio.configuration.SSLConfiguration import scala.concurrent.duration._ @@ -41,7 +42,8 @@ case class DashboardConfig( ip: String = "localhost", port: Int = 9000) -object Dashboard extends Logging with SSLConfiguration { +object Dashboard extends Logging { + def main(args: Array[String]): Unit = { val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") { opt[String]("ip") action { (x, c) => @@ -53,108 +55,108 @@ object Dashboard extends Logging with SSLConfiguration { } parser.parse(args, DashboardConfig()) map { dc => - createDashboard(dc).awaitTermination + val f = DashboardServer.createDashboard(dc).whenTerminated + Await.result(f, Duration.Inf) } } +} + +object DashboardServer extends KeyAuthentication with CorsSupport with SSLConfiguration { + def createDashboard(dc: DashboardConfig): ActorSystem = { val systemName = "pio-dashboard" implicit val system = ActorSystem(systemName) - val service = - system.actorOf(Props(classOf[DashboardActor], dc), "dashboard") - implicit val timeout = Timeout(5.seconds) - val settings = ServerSettings(system) + implicit val materializer = ActorMaterializer() + implicit val executionContext = system.dispatcher val serverConfig = ConfigFactory.load("server.conf") val sslEnforced = serverConfig.getBoolean("org.apache.predictionio.server.ssl-enforced") - IO(Http) ? Http.Bind( - service, - interface = dc.ip, - port = dc.port, - settings = Some(settings.copy(sslEncryption = sslEnforced))) + val route = createRoute(DateTime.now, dc) + if(sslEnforced){ + val https: HttpsConnectionContext = ConnectionContext.https(sslContext) + Http().setDefaultServerHttpContext(https) + Http().bindAndHandle(route, dc.ip, dc.port, connectionContext = https) + } else { + Http().bindAndHandle(route, dc.ip, dc.port) + } system } -} -class DashboardActor( - val dc: DashboardConfig) - extends Actor with DashboardService { - def actorRefFactory: ActorContext = context - def receive: Actor.Receive = runRoute(dashboardRoute) -} + def createRoute(serverStartTime: DateTime, dc: DashboardConfig) + (implicit executionContext: ExecutionContext): Route = { + val evaluationInstances = Storage.getMetaDataEvaluationInstances + val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")) -trait DashboardService extends HttpService with KeyAuthentication with CORSSupport { - - implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher - val dc: DashboardConfig - val evaluationInstances = Storage.getMetaDataEvaluationInstances - val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")) - val serverStartTime = DateTime.now - val dashboardRoute = - path("") { - authenticate(withAccessKeyFromFile) { request => - get { - respondWithMediaType(`text/html`) { - complete { - val completedInstances = evaluationInstances.getCompleted - html.index( - dc, - serverStartTime, - pioEnvVars, - completedInstances).toString - } - } + def authenticate[T](authenticator: RequestContext => Future[Either[Rejection, T]]): + AuthenticationDirective[T] = { + extractRequestContext.flatMap { requestContext => + onSuccess(authenticator(requestContext)).flatMap { + case Right(x) => provide(x) + case Left(x) => reject(x): Directive1[T] } } - } ~ - pathPrefix("engine_instances" / Segment) { instanceId => - path("evaluator_results.txt") { - get { - respondWithMediaType(`text/plain`) { + } + + val route: Route = + path("") { + authenticate(withAccessKeyFromFile) { request => + get { + val completedInstances = evaluationInstances.getCompleted + complete(HttpResponse(entity = HttpEntity( + `text/html(UTF-8)`, + html.index(dc, serverStartTime, pioEnvVars, completedInstances).toString + ))) + } + } + } ~ + pathPrefix("engine_instances" / Segment) { instanceId => + path("evaluator_results.txt") { + get { evaluationInstances.get(instanceId).map { i => complete(i.evaluatorResults) } getOrElse { complete(StatusCodes.NotFound) } } - } - } ~ - path("evaluator_results.html") { - get { - respondWithMediaType(`text/html`) { + } ~ + path("evaluator_results.html") { + get { evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResultsHTML) + complete(HttpResponse( + entity = HttpEntity(`text/html(UTF-8)`, i.evaluatorResultsHTML))) } getOrElse { complete(StatusCodes.NotFound) } } - } - } ~ - path("evaluator_results.json") { - get { - respondWithMediaType(`application/json`) { + } ~ + path("evaluator_results.json") { + get { evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResultsJSON) + complete(HttpResponse( + entity = HttpEntity(`application/json`, i.evaluatorResultsJSON))) } getOrElse { complete(StatusCodes.NotFound) } } - } - } ~ - cors { - path("local_evaluator_results.json") { - get { - respondWithMediaType(`application/json`) { + } ~ + corsHandler { + path("local_evaluator_results.json") { + get { evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResultsJSON) + complete(HttpResponse( + entity = HttpEntity(`application/json`, i.evaluatorResultsJSON))) } getOrElse { complete(StatusCodes.NotFound) } } } + } ~ + pathPrefix("assets") { + getFromResourceDirectory("assets") } } - } ~ - pathPrefix("assets") { - getFromResourceDirectory("assets") - } + + route + } + } diff --git a/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala b/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala index e6c8bd39d..e554ebf80 100644 --- a/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala +++ b/tools/src/test/scala/org/apache/predictionio/tools/admin/AdminAPISpec.scala @@ -17,67 +17,19 @@ package org.apache.predictionio.tools.admin -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import org.apache.predictionio.data.storage.Storage import org.specs2.mutable.Specification -import spray.http._ -import spray.httpx.RequestBuilding._ -import spray.util._ +import akka.http.scaladsl.testkit.Specs2RouteTest - -class AdminAPISpec extends Specification{ - - val system = ActorSystem(Utils.actorSystemNameFrom(getClass)) - val config = AdminServerConfig( - ip = "localhost", - port = 7071) - - val commandClient = new CommandClient( - appClient = Storage.getMetaDataApps, - accessKeyClient = Storage.getMetaDataAccessKeys, - eventClient = Storage.getLEvents() - ) - - val adminActor= system.actorOf(Props(classOf[AdminServiceActor], commandClient)) +class AdminAPISpec extends Specification with Specs2RouteTest { + val route = AdminServer.createRoute() "GET / request" should { "properly produce OK HttpResponses" in { - val probe = TestProbe()(system) - probe.send(adminActor, Get("/")) - - probe.expectMsg( - HttpResponse( - 200, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"status":"alive"}""" - ) - ) - ) - success - } - } - - "GET /cmd/app request" should { - "properly produce OK HttpResponses" in { - /* - val probe = TestProbe()(system) - probe.send(adminActor,Get("/cmd/app")) - - //TODO: Need to convert the response string to the corresponding case object to assert some properties on the object - probe.expectMsg( - HttpResponse( - 200, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"status":1}""" - ) - ) - )*/ - pending + Get() ~> route ~> check { + response.status.intValue() shouldEqual 200 + responseAs[String] shouldEqual """{"status":"alive"}""" + } } } - step(system.shutdown()) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move from spray to akka-http in servers > --------------------------------------- > > Key: PIO-31 > URL: https://issues.apache.org/jira/browse/PIO-31 > Project: PredictionIO > Issue Type: Improvement > Components: Core > Reporter: Marcin Ziemiński > Assignee: Naoki Takezoe > Priority: Major > Labels: gsoc2017, newbie > > On account of the death of spray for http and it being reborn as akka-http we > should update EventServer and Dashbord. It should be fairly simple, as > described in the following guide: > http://doc.akka.io/docs/akka/2.4/scala/http/migration-from-spray.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)