http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala new file mode 100644 index 0000000..7174ec8 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala @@ -0,0 +1,640 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import akka.event.Logging +import sun.misc.BASE64Decoder + +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.io.IO +import akka.pattern.ask +import akka.util.Timeout +import org.apache.predictionio.data.Utils +import org.apache.predictionio.data.storage.AccessKeys +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.DateTimeJson4sSupport +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventJson4sSupport +import org.apache.predictionio.data.storage.BatchEventsJson4sSupport +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.Storage +import org.json4s.DefaultFormats +import org.json4s.Formats +import org.json4s.JObject +import org.json4s.native.JsonMethods.parse +import spray.can.Http +import spray.http.FormData +import spray.http.MediaTypes +import spray.http.StatusCodes +import spray.httpx.Json4sSupport +import spray.routing._ +import spray.routing.authentication.Authentication + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.{Try, Success, Failure} + +class EventServiceActor( + val eventClient: LEvents, + val accessKeysClient: AccessKeys, + val channelsClient: Channels, + val config: EventServerConfig) extends HttpServiceActor { + + object Json4sProtocol extends Json4sSupport { + implicit def json4sFormats: Formats = DefaultFormats + + new EventJson4sSupport.APISerializer + + new BatchEventsJson4sSupport.APISerializer + + // NOTE: don't use Json4s JodaTimeSerializers since it has issues, + // some format not converted, or timezone not correct + new DateTimeJson4sSupport.Serializer + } + + + val MaxNumberOfEventsPerBatchRequest = 50 + + val logger = Logging(context.system, this) + + // we use the enclosing ActorContext's or ActorSystem's dispatcher for our + // Futures + implicit def executionContext: ExecutionContext = context.dispatcher + + implicit val timeout = Timeout(5, TimeUnit.SECONDS) + + val rejectionHandler = Common.rejectionHandler + + val jsonPath = """(.+)\.json$""".r + val formPath = """(.+)\.form$""".r + + val pluginContext = EventServerPluginContext(logger) + + private lazy val base64Decoder = new BASE64Decoder + + case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String]) + + /* with accessKey in query/header, return appId if succeed */ + def withAccessKey: RequestContext => Future[Authentication[AuthData]] = { + ctx: RequestContext => + val accessKeyParamOpt = ctx.request.uri.query.get("accessKey") + val channelParamOpt = ctx.request.uri.query.get("channel") + Future { + // with accessKey in query, return appId if succeed + accessKeyParamOpt.map { accessKeyParam => + accessKeysClient.get(accessKeyParam).map { k => + channelParamOpt.map { ch => + val channelMap = + channelsClient.getByAppid(k.appid) + .map(c => (c.name, c.id)).toMap + if (channelMap.contains(ch)) { + Right(AuthData(k.appid, Some(channelMap(ch)), k.events)) + } else { + Left(ChannelRejection(s"Invalid channel '$ch'.")) + } + }.getOrElse{ + Right(AuthData(k.appid, None, k.events)) + } + }.getOrElse(FailedAuth) + }.getOrElse { + // with accessKey in header, return appId if succeed + ctx.request.headers.find(_.name == "Authorization").map { authHeader â + authHeader.value.split("Basic ") match { + case Array(_, value) â + val appAccessKey = + new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0) + accessKeysClient.get(appAccessKey) match { + case Some(k) â Right(AuthData(k.appid, None, k.events)) + case None â FailedAuth + } + + case _ â FailedAuth + } + }.getOrElse(MissedAuth) + } + } + } + + private val FailedAuth = Left( + AuthenticationFailedRejection( + AuthenticationFailedRejection.CredentialsRejected, List() + ) + ) + + private val MissedAuth = Left( + AuthenticationFailedRejection( + AuthenticationFailedRejection.CredentialsMissing, List() + ) + ) + + lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor") + lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor") + + val route: Route = + pathSingleSlash { + import Json4sProtocol._ + + get { + respondWithMediaType(MediaTypes.`application/json`) { + complete(Map("status" -> "alive")) + } + } + } ~ + path("plugins.json") { + import Json4sProtocol._ + get { + respondWithMediaType(MediaTypes.`application/json`) { + complete { + Map("plugins" -> Map( + "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) => + n -> Map( + "name" -> p.pluginName, + "description" -> p.pluginDescription, + "class" -> p.getClass.getName) + }, + "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) => + n -> Map( + "name" -> p.pluginName, + "description" -> p.pluginDescription, + "class" -> p.getClass.getName) + } + )) + } + } + } + } ~ + path("plugins" / Segments) { segments => + get { + handleExceptions(Common.exceptionHandler) { + authenticate(withAccessKey) { authData => + respondWithMediaType(MediaTypes.`application/json`) { + complete { + val pluginArgs = segments.drop(2) + val pluginType = segments(0) + val pluginName = segments(1) + pluginType match { + case EventServerPlugin.inputBlocker => + pluginContext.inputBlockers(pluginName).handleREST( + authData.appId, + authData.channelId, + pluginArgs) + case EventServerPlugin.inputSniffer => + pluginsActorRef ? PluginsActor.HandleREST( + appId = authData.appId, + channelId = authData.channelId, + pluginName = pluginName, + pluginArgs = pluginArgs) map { + _.asInstanceOf[String] + } + } + } + } + } + } + } + } ~ + path("events" / jsonPath ) { eventId => + + import Json4sProtocol._ + + get { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + respondWithMediaType(MediaTypes.`application/json`) { + complete { + logger.debug(s"GET event ${eventId}.") + val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt => + eventOpt.map( event => + (StatusCodes.OK, event) + ).getOrElse( + (StatusCodes.NotFound, Map("message" -> "Not Found")) + ) + } + data + } + } + } + } + } + } ~ + delete { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + respondWithMediaType(MediaTypes.`application/json`) { + complete { + logger.debug(s"DELETE event ${eventId}.") + val data = eventClient.futureDelete(eventId, appId, channelId).map { found => + if (found) { + (StatusCodes.OK, Map("message" -> "Found")) + } else { + (StatusCodes.NotFound, Map("message" -> "Not Found")) + } + } + data + } + } + } + } + } + } + } ~ + path("events.json") { + + import Json4sProtocol._ + + post { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + val events = authData.events + entity(as[Event]) { event => + complete { + if (events.isEmpty || authData.events.contains(event.event)) { + pluginContext.inputBlockers.values.foreach( + _.process(EventInfo( + appId = appId, + channelId = channelId, + event = event), pluginContext)) + val data = eventClient.futureInsert(event, appId, channelId).map { id => + pluginsActorRef ! EventInfo( + appId = appId, + channelId = channelId, + event = event) + val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) + if (config.stats) { + statsActorRef ! Bookkeeping(appId, result._1, event) + } + result + } + data + } else { + (StatusCodes.Forbidden, + Map("message" -> s"${event.event} events are not allowed")) + } + } + } + } + } + } + } ~ + get { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + parameters( + 'startTime.as[Option[String]], + 'untilTime.as[Option[String]], + 'entityType.as[Option[String]], + 'entityId.as[Option[String]], + 'event.as[Option[String]], + 'targetEntityType.as[Option[String]], + 'targetEntityId.as[Option[String]], + 'limit.as[Option[Int]], + 'reversed.as[Option[Boolean]]) { + (startTimeStr, untilTimeStr, entityType, entityId, + eventName, // only support one event name + targetEntityType, targetEntityId, + limit, reversed) => + respondWithMediaType(MediaTypes.`application/json`) { + complete { + logger.debug( + s"GET events of appId=${appId} " + + s"st=${startTimeStr} ut=${untilTimeStr} " + + s"et=${entityType} eid=${entityId} " + + s"li=${limit} rev=${reversed} ") + + require(!((reversed == Some(true)) + && (entityType.isEmpty || entityId.isEmpty)), + "the parameter reversed can only be used with" + + " both entityType and entityId specified.") + + val parseTime = Future { + val startTime = startTimeStr.map(Utils.stringToDateTime(_)) + val untilTime = untilTimeStr.map(Utils.stringToDateTime(_)) + (startTime, untilTime) + } + + + parseTime.flatMap { case (startTime, untilTime) => + val data = eventClient.futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventName.map(List(_)), + targetEntityType = targetEntityType.map(Some(_)), + targetEntityId = targetEntityId.map(Some(_)), + limit = limit.orElse(Some(20)), + reversed = reversed) + .map { eventIter => + if (eventIter.hasNext) { + (StatusCodes.OK, eventIter.toArray) + } else { + (StatusCodes.NotFound, + Map("message" -> "Not Found")) + } + } + data + }.recover { + case e: Exception => + (StatusCodes.BadRequest, Map("message" -> s"${e}")) + } + } + } + } + } + } + } + } + } ~ + path("batch" / "events.json") { + + import Json4sProtocol._ + + post { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + val allowedEvents = authData.events + val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = { + case Success(event) => { + if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) { + pluginContext.inputBlockers.values.foreach( + _.process(EventInfo( + appId = appId, + channelId = channelId, + event = event), pluginContext)) + val data = eventClient.futureInsert(event, appId, channelId).map { id => + pluginsActorRef ! EventInfo( + appId = appId, + channelId = channelId, + event = event) + val status = StatusCodes.Created + val result = Map( + "status" -> status.intValue, + "eventId" -> s"${id}") + if (config.stats) { + statsActorRef ! Bookkeeping(appId, status, event) + } + result + }.recover { case exception => + Map( + "status" -> StatusCodes.InternalServerError.intValue, + "message" -> s"${exception.getMessage()}") + } + data + } else { + Future.successful(Map( + "status" -> StatusCodes.Forbidden.intValue, + "message" -> s"${event.event} events are not allowed")) + } + } + case Failure(exception) => { + Future.successful(Map( + "status" -> StatusCodes.BadRequest.intValue, + "message" -> s"${exception.getMessage()}")) + } + } + + entity(as[Seq[Try[Event]]]) { events => + complete { + if (events.length <= MaxNumberOfEventsPerBatchRequest) { + Future.traverse(events)(handleEvent) + } else { + (StatusCodes.BadRequest, + Map("message" -> (s"Batch request must have less than or equal to " + + s"${MaxNumberOfEventsPerBatchRequest} events"))) + } + } + } + } + } + } + } + } ~ + path("stats.json") { + + import Json4sProtocol._ + + get { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + respondWithMediaType(MediaTypes.`application/json`) { + if (config.stats) { + complete { + statsActorRef ? GetStats(appId) map { + _.asInstanceOf[Map[String, StatsSnapshot]] + } + } + } else { + complete( + StatusCodes.NotFound, + parse("""{"message": "To see stats, launch Event Server """ + + """with --stats argument."}""")) + } + } + } + } + } + } // stats.json get + } ~ + path("webhooks" / jsonPath ) { web => + import Json4sProtocol._ + + post { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + respondWithMediaType(MediaTypes.`application/json`) { + entity(as[JObject]) { jObj => + complete { + Webhooks.postJson( + appId = appId, + channelId = channelId, + web = web, + data = jObj, + eventClient = eventClient, + log = logger, + stats = config.stats, + statsActorRef = statsActorRef) + } + } + } + } + } + } + } ~ + get { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + respondWithMediaType(MediaTypes.`application/json`) { + complete { + Webhooks.getJson( + appId = appId, + channelId = channelId, + web = web, + log = logger) + } + } + } + } + } + } + } ~ + path("webhooks" / formPath ) { web => + post { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + respondWithMediaType(MediaTypes.`application/json`) { + entity(as[FormData]){ formData => + // logger.debug(formData.toString) + complete { + // respond with JSON + import Json4sProtocol._ + + Webhooks.postForm( + appId = appId, + channelId = channelId, + web = web, + data = formData, + eventClient = eventClient, + log = logger, + stats = config.stats, + statsActorRef = statsActorRef) + } + } + } + } + } + } + } ~ + get { + handleExceptions(Common.exceptionHandler) { + handleRejections(rejectionHandler) { + authenticate(withAccessKey) { authData => + val appId = authData.appId + val channelId = authData.channelId + respondWithMediaType(MediaTypes.`application/json`) { + complete { + // respond with JSON + import Json4sProtocol._ + + Webhooks.getForm( + appId = appId, + channelId = channelId, + web = web, + log = logger) + } + } + } + } + } + } + + } + + def receive: Actor.Receive = runRoute(route) +} + + + +/* message */ +case class StartServer(host: String, port: Int) + +class EventServerActor( + val eventClient: LEvents, + val accessKeysClient: AccessKeys, + val channelsClient: Channels, + val config: EventServerConfig) extends Actor with ActorLogging { + val child = context.actorOf( + Props(classOf[EventServiceActor], + eventClient, + accessKeysClient, + channelsClient, + config), + "EventServiceActor") + implicit val system = context.system + + def receive: Actor.Receive = { + case StartServer(host, portNum) => { + IO(Http) ! Http.Bind(child, interface = host, port = portNum) + } + case m: Http.Bound => log.info("Bound received. EventServer is ready.") + case m: Http.CommandFailed => log.error("Command failed.") + case _ => log.error("Unknown message.") + } +} + +case class EventServerConfig( + ip: String = "localhost", + port: Int = 7070, + plugins: String = "plugins", + stats: Boolean = false) + +object EventServer { + def createEventServer(config: EventServerConfig): Unit = { + implicit val system = ActorSystem("EventServerSystem") + + val eventClient = Storage.getLEvents() + val accessKeysClient = Storage.getMetaDataAccessKeys() + val channelsClient = Storage.getMetaDataChannels() + + val serverActor = system.actorOf( + Props( + classOf[EventServerActor], + eventClient, + accessKeysClient, + channelsClient, + config), + "EventServerActor" + ) + if (config.stats) system.actorOf(Props[StatsActor], "StatsActor") + system.actorOf(Props[PluginsActor], "PluginsActor") + serverActor ! StartServer(config.ip, config.port) + system.awaitTermination() + } +} + +object Run { + def main(args: Array[String]) { + EventServer.createEventServer(EventServerConfig( + ip = "0.0.0.0", + port = 7070)) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala new file mode 100644 index 0000000..c4918c2 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPlugin.scala @@ -0,0 +1,33 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +trait EventServerPlugin { + val pluginName: String + val pluginDescription: String + val pluginType: String + + def start(context: EventServerPluginContext): Unit + + def process(eventInfo: EventInfo, context: EventServerPluginContext) + + def handleREST(appId: Int, channelId: Option[Int], arguments: Seq[String]): String +} + +object EventServerPlugin { + val inputBlocker = "inputblocker" + val inputSniffer = "inputsniffer" +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala new file mode 100644 index 0000000..db5743b --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala @@ -0,0 +1,49 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import java.util.ServiceLoader + +import akka.event.LoggingAdapter +import grizzled.slf4j.Logging + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +class EventServerPluginContext( + val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]], + val log: LoggingAdapter) { + def inputBlockers: Map[String, EventServerPlugin] = + plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap + + def inputSniffers: Map[String, EventServerPlugin] = + plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap +} + +object EventServerPluginContext extends Logging { + def apply(log: LoggingAdapter): EventServerPluginContext = { + val plugins = mutable.Map[String, mutable.Map[String, EventServerPlugin]]( + EventServerPlugin.inputBlocker -> mutable.Map(), + EventServerPlugin.inputSniffer -> mutable.Map()) + val serviceLoader = ServiceLoader.load(classOf[EventServerPlugin]) + serviceLoader foreach { service => + plugins(service.pluginType) += service.pluginName -> service + } + new EventServerPluginContext( + plugins, + log) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala new file mode 100644 index 0000000..e6c1ae8 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/PluginsActor.scala @@ -0,0 +1,52 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import akka.actor.Actor +import akka.event.Logging + +class PluginsActor() extends Actor { + implicit val system = context.system + val log = Logging(system, this) + + val pluginContext = EventServerPluginContext(log) + + def receive: PartialFunction[Any, Unit] = { + case e: EventInfo => + pluginContext.inputSniffers.values.foreach(_.process(e, pluginContext)) + case h: PluginsActor.HandleREST => + try { + sender() ! pluginContext.inputSniffers(h.pluginName).handleREST( + h.appId, + h.channelId, + h.pluginArgs) + } catch { + case e: Exception => + sender() ! s"""{"message":"${e.getMessage}"}""" + } + case _ => + log.error("Unknown message sent to Event Server input sniffer plugin host.") + } +} + +object PluginsActor { + case class HandleREST( + pluginName: String, + appId: Int, + channelId: Option[Int], + pluginArgs: Seq[String]) +} + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala new file mode 100644 index 0000000..231d101 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala @@ -0,0 +1,79 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.storage.Event + +import spray.http.StatusCode + +import scala.collection.mutable.{ HashMap => MHashMap } +import scala.collection.mutable + +import com.github.nscala_time.time.Imports.DateTime + +case class EntityTypesEvent( + val entityType: String, + val targetEntityType: Option[String], + val event: String) { + + def this(e: Event) = this( + e.entityType, + e.targetEntityType, + e.event) +} + +case class KV[K, V](key: K, value: V) + +case class StatsSnapshot( + val startTime: DateTime, + val endTime: Option[DateTime], + val basic: Seq[KV[EntityTypesEvent, Long]], + val statusCode: Seq[KV[StatusCode, Long]] +) + + +class Stats(val startTime: DateTime) { + private[this] var _endTime: Option[DateTime] = None + var statusCodeCount = MHashMap[(Int, StatusCode), Long]().withDefaultValue(0L) + var eteCount = MHashMap[(Int, EntityTypesEvent), Long]().withDefaultValue(0L) + + def cutoff(endTime: DateTime) { + _endTime = Some(endTime) + } + + def update(appId: Int, statusCode: StatusCode, event: Event) { + statusCodeCount((appId, statusCode)) += 1 + eteCount((appId, new EntityTypesEvent(event))) += 1 + } + + def extractByAppId[K, V](appId: Int, m: mutable.Map[(Int, K), V]) + : Seq[KV[K, V]] = { + m + .toSeq + .flatMap { case (k, v) => + if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() } + } + } + + def get(appId: Int): StatsSnapshot = { + StatsSnapshot( + startTime, + _endTime, + extractByAppId(appId, eteCount), + extractByAppId(appId, statusCodeCount) + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala new file mode 100644 index 0000000..a8ed3e7 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/StatsActor.scala @@ -0,0 +1,74 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.storage.Event + +import spray.http.StatusCode + +import akka.actor.Actor +import akka.event.Logging + +import com.github.nscala_time.time.Imports.DateTime + +/* message to StatsActor */ +case class Bookkeeping(val appId: Int, statusCode: StatusCode, event: Event) + +/* message to StatsActor */ +case class GetStats(val appId: Int) + +class StatsActor extends Actor { + implicit val system = context.system + val log = Logging(system, this) + + def getCurrent: DateTime = { + DateTime.now. + withMinuteOfHour(0). + withSecondOfMinute(0). + withMillisOfSecond(0) + } + + var longLiveStats = new Stats(DateTime.now) + var hourlyStats = new Stats(getCurrent) + + var prevHourlyStats = new Stats(getCurrent.minusHours(1)) + prevHourlyStats.cutoff(hourlyStats.startTime) + + def bookkeeping(appId: Int, statusCode: StatusCode, event: Event) { + val current = getCurrent + // If the current hour is different from the stats start time, we create + // another stats instance, and move the current to prev. + if (current != hourlyStats.startTime) { + prevHourlyStats = hourlyStats + prevHourlyStats.cutoff(current) + hourlyStats = new Stats(current) + } + + hourlyStats.update(appId, statusCode, event) + longLiveStats.update(appId, statusCode, event) + } + + def receive: Actor.Receive = { + case Bookkeeping(appId, statusCode, event) => + bookkeeping(appId, statusCode, event) + case GetStats(appId) => sender() ! Map( + "time" -> DateTime.now, + "currentHour" -> hourlyStats.get(appId), + "prevHour" -> prevHourlyStats.get(appId), + "longLive" -> longLiveStats.get(appId)) + case _ => log.error("Unknown message.") + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala new file mode 100644 index 0000000..04ff78f --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala @@ -0,0 +1,151 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.webhooks.JsonConnector +import org.apache.predictionio.data.webhooks.FormConnector +import org.apache.predictionio.data.webhooks.ConnectorUtil +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventJson4sSupport +import org.apache.predictionio.data.storage.LEvents + +import spray.routing._ +import spray.routing.Directives._ +import spray.http.StatusCodes +import spray.http.StatusCode +import spray.http.FormData +import spray.httpx.Json4sSupport + +import org.json4s.Formats +import org.json4s.DefaultFormats +import org.json4s.JObject + +import akka.event.LoggingAdapter +import akka.actor.ActorSelection + +import scala.concurrent.{ExecutionContext, Future} + + +private[prediction] object Webhooks { + + def postJson( + appId: Int, + channelId: Option[Int], + web: String, + data: JObject, + eventClient: LEvents, + log: LoggingAdapter, + stats: Boolean, + statsActorRef: ActorSelection + )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { + + val eventFuture = Future { + WebhooksConnectors.json.get(web).map { connector => + ConnectorUtil.toEvent(connector, data) + } + } + + eventFuture.flatMap { eventOpt => + if (eventOpt.isEmpty) { + Future successful { + val message = s"webhooks connection for ${web} is not supported." + (StatusCodes.NotFound, Map("message" -> message)) + } + } else { + val event = eventOpt.get + val data = eventClient.futureInsert(event, appId, channelId).map { id => + val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) + + if (stats) { + statsActorRef ! Bookkeeping(appId, result._1, event) + } + result + } + data + } + } + } + + def getJson( + appId: Int, + channelId: Option[Int], + web: String, + log: LoggingAdapter + )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { + Future { + WebhooksConnectors.json.get(web).map { connector => + (StatusCodes.OK, Map("message" -> "Ok")) + }.getOrElse { + val message = s"webhooks connection for ${web} is not supported." + (StatusCodes.NotFound, Map("message" -> message)) + } + } + } + + def postForm( + appId: Int, + channelId: Option[Int], + web: String, + data: FormData, + eventClient: LEvents, + log: LoggingAdapter, + stats: Boolean, + statsActorRef: ActorSelection + )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { + val eventFuture = Future { + WebhooksConnectors.form.get(web).map { connector => + ConnectorUtil.toEvent(connector, data.fields.toMap) + } + } + + eventFuture.flatMap { eventOpt => + if (eventOpt.isEmpty) { + Future { + val message = s"webhooks connection for ${web} is not supported." + (StatusCodes.NotFound, Map("message" -> message)) + } + } else { + val event = eventOpt.get + val data = eventClient.futureInsert(event, appId, channelId).map { id => + val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) + + if (stats) { + statsActorRef ! Bookkeeping(appId, result._1, event) + } + result + } + data + } + } + } + + def getForm( + appId: Int, + channelId: Option[Int], + web: String, + log: LoggingAdapter + )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { + Future { + WebhooksConnectors.form.get(web).map { connector => + (StatusCodes.OK, Map("message" -> "Ok")) + }.getOrElse { + val message = s"webhooks connection for ${web} is not supported." + (StatusCodes.NotFound, Map("message" -> message)) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala b/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala new file mode 100644 index 0000000..c2578ee --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/WebhooksConnectors.scala @@ -0,0 +1,34 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.webhooks.JsonConnector +import org.apache.predictionio.data.webhooks.FormConnector + +import org.apache.predictionio.data.webhooks.segmentio.SegmentIOConnector +import org.apache.predictionio.data.webhooks.mailchimp.MailChimpConnector + +private[prediction] object WebhooksConnectors { + + val json: Map[String, JsonConnector] = Map( + "segmentio" -> SegmentIOConnector + ) + + val form: Map[String, FormConnector] = Map( + "mailchimp" -> MailChimpConnector + ) + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/package.scala b/data/src/main/scala/org/apache/predictionio/data/package.scala new file mode 100644 index 0000000..9284787 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/package.scala @@ -0,0 +1,21 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio + +/** Provides data access for PredictionIO and any engines running on top of + * PredictionIO + */ +package object data {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala new file mode 100644 index 0000000..3285de9 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/AccessKeys.scala @@ -0,0 +1,71 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import java.security.SecureRandom + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.commons.codec.binary.Base64 + +/** :: DeveloperApi :: + * Stores mapping of access keys, app IDs, and lists of allowed event names + * + * @param key Access key + * @param appid App ID + * @param events List of allowed events for this particular app key + * @group Meta Data + */ +@DeveloperApi +case class AccessKey( + key: String, + appid: Int, + events: Seq[String]) + +/** :: DeveloperApi :: + * Base trait of the [[AccessKey]] data access object + * + * @group Meta Data + */ +@DeveloperApi +trait AccessKeys { + /** Insert a new [[AccessKey]]. If the key field is empty, a key will be + * generated. + */ + def insert(k: AccessKey): Option[String] + + /** Get an [[AccessKey]] by key */ + def get(k: String): Option[AccessKey] + + /** Get all [[AccessKey]]s */ + def getAll(): Seq[AccessKey] + + /** Get all [[AccessKey]]s for a particular app ID */ + def getByAppid(appid: Int): Seq[AccessKey] + + /** Update an [[AccessKey]] */ + def update(k: AccessKey): Unit + + /** Delete an [[AccessKey]] */ + def delete(k: String): Unit + + /** Default implementation of key generation */ + def generateKey: String = { + val sr = SecureRandom.getInstanceStrong + val srBytes = Array.fill(48)(0.toByte) + sr.nextBytes(srBytes) + Base64.encodeBase64URLSafeString(srBytes) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala new file mode 100644 index 0000000..b68e1b6 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Apps.scala @@ -0,0 +1,58 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi + +/** :: DeveloperApi :: + * Stores mapping of app IDs and names + * + * @param id ID of the app. + * @param name Name of the app. + * @param description Long description of the app. + * @group Meta Data + */ +@DeveloperApi +case class App( + id: Int, + name: String, + description: Option[String]) + +/** :: DeveloperApi :: + * Base trait of the [[App]] data access object + * + * @group Meta Data + */ +@DeveloperApi +trait Apps { + /** Insert a new [[App]]. Returns a generated app ID if the supplied app ID is 0. */ + def insert(app: App): Option[Int] + + /** Get an [[App]] by app ID */ + def get(id: Int): Option[App] + + /** Get an [[App]] by app name */ + def getByName(name: String): Option[App] + + /** Get all [[App]]s */ + def getAll(): Seq[App] + + /** Update an [[App]] */ + def update(app: App): Unit + + /** Delete an [[App]] */ + def delete(id: Int): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala new file mode 100644 index 0000000..ad845b3 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/BiMap.scala @@ -0,0 +1,164 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import scala.collection.immutable.HashMap + +import org.apache.spark.rdd.RDD + +/** Immutable Bi-directional Map + * + */ +class BiMap[K, V] private[prediction] ( + private val m: Map[K, V], + private val i: Option[BiMap[V, K]] = None + ) extends Serializable { + + // NOTE: make inverse's inverse point back to current BiMap + val inverse: BiMap[V, K] = i.getOrElse { + val rev = m.map(_.swap) + require((rev.size == m.size), + s"Failed to create reversed map. Cannot have duplicated values.") + new BiMap(rev, Some(this)) + } + + def get(k: K): Option[V] = m.get(k) + + def getOrElse(k: K, default: => V): V = m.getOrElse(k, default) + + def contains(k: K): Boolean = m.contains(k) + + def apply(k: K): V = m.apply(k) + + /** Converts to a map. + * @return a map of type immutable.Map[K, V] + */ + def toMap: Map[K, V] = m + + /** Converts to a sequence. + * @return a sequence containing all elements of this map + */ + def toSeq: Seq[(K, V)] = m.toSeq + + def size: Int = m.size + + def take(n: Int): BiMap[K, V] = BiMap(m.take(n)) + + override def toString: String = m.toString +} + +object BiMap { + + def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x) + + /** Create a BiMap[String, Long] from a set of String. The Long index starts + * from 0. + * @param keys a set of String + * @return a String to Long BiMap + */ + def stringLong(keys: Set[String]): BiMap[String, Long] = { + val hm = HashMap(keys.toSeq.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*) + new BiMap(hm) + } + + /** Create a BiMap[String, Long] from an array of String. + * NOTE: the the array cannot have duplicated element. + * The Long index starts from 0. + * @param keys a set of String + * @return a String to Long BiMap + */ + def stringLong(keys: Array[String]): BiMap[String, Long] = { + val hm = HashMap(keys.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*) + new BiMap(hm) + } + + /** Create a BiMap[String, Long] from RDD[String]. The Long index starts + * from 0. + * @param keys RDD of String + * @return a String to Long BiMap + */ + def stringLong(keys: RDD[String]): BiMap[String, Long] = { + stringLong(keys.distinct.collect) + } + + /** Create a BiMap[String, Int] from a set of String. The Int index starts + * from 0. + * @param keys a set of String + * @return a String to Int BiMap + */ + def stringInt(keys: Set[String]): BiMap[String, Int] = { + val hm = HashMap(keys.toSeq.zipWithIndex : _*) + new BiMap(hm) + } + + /** Create a BiMap[String, Int] from an array of String. + * NOTE: the the array cannot have duplicated element. + * The Int index starts from 0. + * @param keys a set of String + * @return a String to Int BiMap + */ + def stringInt(keys: Array[String]): BiMap[String, Int] = { + val hm = HashMap(keys.zipWithIndex : _*) + new BiMap(hm) + } + + /** Create a BiMap[String, Int] from RDD[String]. The Int index starts + * from 0. + * @param keys RDD of String + * @return a String to Int BiMap + */ + def stringInt(keys: RDD[String]): BiMap[String, Int] = { + stringInt(keys.distinct.collect) + } + + private[this] def stringDoubleImpl(keys: Seq[String]) + : BiMap[String, Double] = { + val ki = keys.zipWithIndex.map(e => (e._1, e._2.toDouble)) + new BiMap(HashMap(ki : _*)) + } + + /** Create a BiMap[String, Double] from a set of String. The Double index + * starts from 0. + * @param keys a set of String + * @return a String to Double BiMap + */ + def stringDouble(keys: Set[String]): BiMap[String, Double] = { + // val hm = HashMap(keys.toSeq.zipWithIndex.map(_.toDouble) : _*) + // new BiMap(hm) + stringDoubleImpl(keys.toSeq) + } + + /** Create a BiMap[String, Double] from an array of String. + * NOTE: the the array cannot have duplicated element. + * The Double index starts from 0. + * @param keys a set of String + * @return a String to Double BiMap + */ + def stringDouble(keys: Array[String]): BiMap[String, Double] = { + // val hm = HashMap(keys.zipWithIndex.mapValues(_.toDouble) : _*) + // new BiMap(hm) + stringDoubleImpl(keys.toSeq) + } + + /** Create a BiMap[String, Double] from RDD[String]. The Double index starts + * from 0. + * @param keys RDD of String + * @return a String to Double BiMap + */ + def stringDouble(keys: RDD[String]): BiMap[String, Double] = { + stringDoubleImpl(keys.distinct.collect) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala new file mode 100644 index 0000000..e602e1e --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Channels.scala @@ -0,0 +1,79 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi + +/** :: DeveloperApi :: + * Stores mapping of channel IDs, names and app ID + * + * @param id ID of the channel + * @param name Name of the channel (must be unique within the same app) + * @param appid ID of the app which this channel belongs to + * @group Meta Data + */ +@DeveloperApi +case class Channel( + id: Int, + name: String, // must be unique within the same app + appid: Int +) { + require(Channel.isValidName(name), + "Invalid channel name: ${name}. ${Channel.nameConstraint}") +} + +/** :: DeveloperApi :: + * Companion object of [[Channel]] + * + * @group Meta Data + */ +@DeveloperApi +object Channel { + /** Examine whether the supplied channel name is valid. A valid channel name + * must consists of 1 to 16 alphanumeric and '-' characters. + * + * @param s Channel name to examine + * @return true if channel name is valid, false otherwise + */ + def isValidName(s: String): Boolean = { + // note: update channelNameConstraint if this rule is changed + s.matches("^[a-zA-Z0-9-]{1,16}$") + } + + /** For consistent error message display */ + val nameConstraint: String = + "Only alphanumeric and - characters are allowed and max length is 16." +} + +/** :: DeveloperApi :: + * Base trait of the [[Channel]] data access object + * + * @group Meta Data + */ +@DeveloperApi +trait Channels { + /** Insert a new [[Channel]]. Returns a generated channel ID if original ID is 0. */ + def insert(channel: Channel): Option[Int] + + /** Get a [[Channel]] by channel ID */ + def get(id: Int): Option[Channel] + + /** Get all [[Channel]] by app ID */ + def getByAppid(appid: Int): Seq[Channel] + + /** Delete a [[Channel]] */ + def delete(id: Int): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala new file mode 100644 index 0000000..93b6f51 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/DataMap.scala @@ -0,0 +1,241 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.json4s._ +import org.json4s.native.JsonMethods.parse + +import scala.collection.GenTraversableOnce +import scala.collection.JavaConversions + +/** Exception class for [[DataMap]] + * + * @group Event Data + */ +case class DataMapException(msg: String, cause: Exception) + extends Exception(msg, cause) { + def this(msg: String) = this(msg, null) +} + +/** A DataMap stores properties of the event or entity. Internally it is a Map + * whose keys are property names and values are corresponding JSON values + * respectively. Use the [[get]] method to retrieve the value of a mandatory + * property or use [[getOpt]] to retrieve the value of an optional property. + * + * @param fields Map of property name to JValue + * @group Event Data + */ +class DataMap ( + val fields: Map[String, JValue] +) extends Serializable { + @transient lazy implicit private val formats = DefaultFormats + + new DateTimeJson4sSupport.Serializer + + /** Check the existence of a required property name. Throw an exception if + * it does not exist. + * + * @param name The property name + */ + def require(name: String): Unit = { + if (!fields.contains(name)) { + throw new DataMapException(s"The field $name is required.") + } + } + + /** Check if this DataMap contains a specific property. + * + * @param name The property name + * @return Return true if the property exists, else false. + */ + def contains(name: String): Boolean = { + fields.contains(name) + } + + /** Get the value of a mandatory property. Exception is thrown if the property + * does not exist. + * + * @tparam T The type of the property value + * @param name The property name + * @return Return the property value of type T + */ + def get[T: Manifest](name: String): T = { + require(name) + fields(name) match { + case JNull => throw new DataMapException( + s"The required field $name cannot be null.") + case x: JValue => x.extract[T] + } + } + + /** Get the value of an optional property. Return None if the property does + * not exist. + * + * @tparam T The type of the property value + * @param name The property name + * @return Return the property value of type Option[T] + */ + def getOpt[T: Manifest](name: String): Option[T] = { + // either the field doesn't exist or its value is null + fields.get(name).flatMap(_.extract[Option[T]]) + } + + /** Get the value of an optional property. Return default value if the + * property does not exist. + * + * @tparam T The type of the property value + * @param name The property name + * @param default The default property value of type T + * @return Return the property value of type T + */ + def getOrElse[T: Manifest](name: String, default: T): T = { + getOpt[T](name).getOrElse(default) + } + + /** Java-friendly method for getting the value of a property. Return null if the + * property does not exist. + * + * @tparam T The type of the property value + * @param name The property name + * @param clazz The class of the type of the property value + * @return Return the property value of type T + */ + def get[T](name: String, clazz: java.lang.Class[T]): T = { + val manifest = new Manifest[T] { + override def erasure: Class[_] = clazz + override def runtimeClass: Class[_] = clazz + } + + fields.get(name) match { + case None => null.asInstanceOf[T] + case Some(JNull) => null.asInstanceOf[T] + case Some(x) => x.extract[T](formats, manifest) + } + } + + /** Java-friendly method for getting a list of values of a property. Return null if the + * property does not exist. + * + * @param name The property name + * @return Return the list of property values + */ + def getStringList(name: String): java.util.List[String] = { + fields.get(name) match { + case None => null + case Some(JNull) => null + case Some(x) => + JavaConversions.seqAsJavaList(x.extract[List[String]](formats, manifest[List[String]])) + } + } + + /** Return a new DataMap with elements containing elements from the left hand + * side operand followed by elements from the right hand side operand. + * + * @param that Right hand side DataMap + * @return A new DataMap + */ + def ++ (that: DataMap): DataMap = DataMap(this.fields ++ that.fields) + + /** Creates a new DataMap from this DataMap by removing all elements of + * another collection. + * + * @param that A collection containing the removed property names + * @return A new DataMap + */ + def -- (that: GenTraversableOnce[String]): DataMap = + DataMap(this.fields -- that) + + /** Tests whether the DataMap is empty. + * + * @return true if the DataMap is empty, false otherwise. + */ + def isEmpty: Boolean = fields.isEmpty + + /** Collects all property names of this DataMap in a set. + * + * @return a set containing all property names of this DataMap. + */ + def keySet: Set[String] = this.fields.keySet + + /** Converts this DataMap to a List. + * + * @return a list of (property name, JSON value) tuples. + */ + def toList(): List[(String, JValue)] = fields.toList + + /** Converts this DataMap to a JObject. + * + * @return the JObject initialized by this DataMap. + */ + def toJObject(): JObject = JObject(toList()) + + /** Converts this DataMap to case class of type T. + * + * @return the object of type T. + */ + def extract[T: Manifest]: T = { + toJObject().extract[T] + } + + override + def toString: String = s"DataMap($fields)" + + override + def hashCode: Int = 41 + fields.hashCode + + override + def equals(other: Any): Boolean = other match { + case that: DataMap => that.canEqual(this) && this.fields.equals(that.fields) + case _ => false + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[DataMap] +} + +/** Companion object of the [[DataMap]] class + * + * @group Event Data + */ +object DataMap { + /** Create an empty DataMap + * @return an empty DataMap + */ + def apply(): DataMap = new DataMap(Map[String, JValue]()) + + /** Create an DataMap from a Map of String to JValue + * @param fields a Map of String to JValue + * @return a new DataMap initialized by fields + */ + def apply(fields: Map[String, JValue]): DataMap = new DataMap(fields) + + /** Create an DataMap from a JObject + * @param jObj JObject + * @return a new DataMap initialized by a JObject + */ + def apply(jObj: JObject): DataMap = { + if (jObj == null) { + apply() + } else { + new DataMap(jObj.obj.toMap) + } + } + + /** Create an DataMap from a JSON String + * @param js JSON String. eg """{ "a": 1, "b": "foo" }""" + * @return a new DataMap initialized by a JSON string + */ + def apply(js: String): DataMap = apply(parse(js).asInstanceOf[JObject]) + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala new file mode 100644 index 0000000..b3789a4 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/DateTimeJson4sSupport.scala @@ -0,0 +1,47 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.data.{Utils => DataUtils} +import org.joda.time.DateTime +import org.json4s._ + +/** :: DeveloperApi :: + * JSON4S serializer for Joda-Time + * + * @group Common + */ +@DeveloperApi +object DateTimeJson4sSupport { + + @transient lazy implicit val formats = DefaultFormats + + /** Serialize DateTime to JValue */ + def serializeToJValue: PartialFunction[Any, JValue] = { + case d: DateTime => JString(DataUtils.dateTimeToString(d)) + } + + /** Deserialize JValue to DateTime */ + def deserializeFromJValue: PartialFunction[JValue, DateTime] = { + case jv: JValue => DataUtils.stringToDateTime(jv.extract[String]) + } + + /** Custom JSON4S serializer for Joda-Time */ + class Serializer extends CustomSerializer[DateTime](format => ( + deserializeFromJValue, serializeToJValue)) + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala new file mode 100644 index 0000000..bc71f3f --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala @@ -0,0 +1,177 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import com.github.nscala_time.time.Imports._ +import org.apache.predictionio.annotation.DeveloperApi +import org.json4s._ + +/** :: DeveloperApi :: + * Stores parameters, model, and other information for each engine instance + * + * @param id Engine instance ID. + * @param status Status of the engine instance. + * @param startTime Start time of the training/evaluation. + * @param endTime End time of the training/evaluation. + * @param engineId Engine ID of the instance. + * @param engineVersion Engine version of the instance. + * @param engineVariant Engine variant ID of the instance. + * @param engineFactory Engine factory class for the instance. + * @param batch A batch label of the engine instance. + * @param env The environment in which the instance was created. + * @param sparkConf Custom Spark configuration of the instance. + * @param dataSourceParams Data source parameters of the instance. + * @param preparatorParams Preparator parameters of the instance. + * @param algorithmsParams Algorithms parameters of the instance. + * @param servingParams Serving parameters of the instance. + * @group Meta Data + */ +@DeveloperApi +case class EngineInstance( + id: String, + status: String, + startTime: DateTime, + endTime: DateTime, + engineId: String, + engineVersion: String, + engineVariant: String, + engineFactory: String, + batch: String, + env: Map[String, String], + sparkConf: Map[String, String], + dataSourceParams: String, + preparatorParams: String, + algorithmsParams: String, + servingParams: String) + +/** :: DeveloperApi :: + * Base trait of the [[EngineInstance]] data access object + * + * @group Meta Data + */ +@DeveloperApi +trait EngineInstances { + /** Insert a new [[EngineInstance]] */ + def insert(i: EngineInstance): String + + /** Get an [[EngineInstance]] by ID */ + def get(id: String): Option[EngineInstance] + + /** Get all [[EngineInstance]]s */ + def getAll(): Seq[EngineInstance] + + /** Get an instance that has started training the latest and has trained to + * completion + */ + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] + + /** Get all instances that has trained to completion */ + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] + + /** Update an [[EngineInstance]] */ + def update(i: EngineInstance): Unit + + /** Delete an [[EngineInstance]] */ + def delete(id: String): Unit +} + +/** :: DeveloperApi :: + * JSON4S serializer for [[EngineInstance]] + * + * @group Meta Data + */ +@DeveloperApi +class EngineInstanceSerializer + extends CustomSerializer[EngineInstance]( + format => ({ + case JObject(fields) => + implicit val formats = DefaultFormats + val seed = EngineInstance( + id = "", + status = "", + startTime = DateTime.now, + endTime = DateTime.now, + engineId = "", + engineVersion = "", + engineVariant = "", + engineFactory = "", + batch = "", + env = Map(), + sparkConf = Map(), + dataSourceParams = "", + preparatorParams = "", + algorithmsParams = "", + servingParams = "") + fields.foldLeft(seed) { case (i, field) => + field match { + case JField("id", JString(id)) => i.copy(id = id) + case JField("status", JString(status)) => i.copy(status = status) + case JField("startTime", JString(startTime)) => + i.copy(startTime = Utils.stringToDateTime(startTime)) + case JField("endTime", JString(endTime)) => + i.copy(endTime = Utils.stringToDateTime(endTime)) + case JField("engineId", JString(engineId)) => + i.copy(engineId = engineId) + case JField("engineVersion", JString(engineVersion)) => + i.copy(engineVersion = engineVersion) + case JField("engineVariant", JString(engineVariant)) => + i.copy(engineVariant = engineVariant) + case JField("engineFactory", JString(engineFactory)) => + i.copy(engineFactory = engineFactory) + case JField("batch", JString(batch)) => i.copy(batch = batch) + case JField("env", env) => + i.copy(env = Extraction.extract[Map[String, String]](env)) + case JField("sparkConf", sparkConf) => + i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf)) + case JField("dataSourceParams", JString(dataSourceParams)) => + i.copy(dataSourceParams = dataSourceParams) + case JField("preparatorParams", JString(preparatorParams)) => + i.copy(preparatorParams = preparatorParams) + case JField("algorithmsParams", JString(algorithmsParams)) => + i.copy(algorithmsParams = algorithmsParams) + case JField("servingParams", JString(servingParams)) => + i.copy(servingParams = servingParams) + case _ => i + } + } + }, + { + case i: EngineInstance => + JObject( + JField("id", JString(i.id)) :: + JField("status", JString(i.status)) :: + JField("startTime", JString(i.startTime.toString)) :: + JField("endTime", JString(i.endTime.toString)) :: + JField("engineId", JString(i.engineId)) :: + JField("engineVersion", JString(i.engineVersion)) :: + JField("engineVariant", JString(i.engineVariant)) :: + JField("engineFactory", JString(i.engineFactory)) :: + JField("batch", JString(i.batch)) :: + JField("env", Extraction.decompose(i.env)(DefaultFormats)) :: + JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) :: + JField("dataSourceParams", JString(i.dataSourceParams)) :: + JField("preparatorParams", JString(i.preparatorParams)) :: + JField("algorithmsParams", JString(i.algorithmsParams)) :: + JField("servingParams", JString(i.servingParams)) :: + Nil) + } +)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala new file mode 100644 index 0000000..372a2e7 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineManifests.scala @@ -0,0 +1,117 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi +import org.json4s._ + +/** :: DeveloperApi :: + * Provides a way to discover engines by ID and version in a distributed + * environment + * + * @param id Unique identifier of an engine. + * @param version Engine version string. + * @param name A short and descriptive name for the engine. + * @param description A long description of the engine. + * @param files Paths to engine files. + * @param engineFactory Engine's factory class name. + * @group Meta Data + */ +@DeveloperApi +case class EngineManifest( + id: String, + version: String, + name: String, + description: Option[String], + files: Seq[String], + engineFactory: String) + +/** :: DeveloperApi :: + * Base trait of the [[EngineManifest]] data access object + * + * @group Meta Data + */ +@DeveloperApi +trait EngineManifests { + /** Inserts an [[EngineManifest]] */ + def insert(engineManifest: EngineManifest): Unit + + /** Get an [[EngineManifest]] by its ID */ + def get(id: String, version: String): Option[EngineManifest] + + /** Get all [[EngineManifest]] */ + def getAll(): Seq[EngineManifest] + + /** Updates an [[EngineManifest]] */ + def update(engineInfo: EngineManifest, upsert: Boolean = false): Unit + + /** Delete an [[EngineManifest]] by its ID */ + def delete(id: String, version: String): Unit +} + +/** :: DeveloperApi :: + * JSON4S serializer for [[EngineManifest]] + * + * @group Meta Data + */ +@DeveloperApi +class EngineManifestSerializer + extends CustomSerializer[EngineManifest](format => ( + { + case JObject(fields) => + val seed = EngineManifest( + id = "", + version = "", + name = "", + description = None, + files = Nil, + engineFactory = "") + fields.foldLeft(seed) { case (enginemanifest, field) => + field match { + case JField("id", JString(id)) => enginemanifest.copy(id = id) + case JField("version", JString(version)) => + enginemanifest.copy(version = version) + case JField("name", JString(name)) => enginemanifest.copy(name = name) + case JField("description", JString(description)) => + enginemanifest.copy(description = Some(description)) + case JField("files", JArray(s)) => + enginemanifest.copy(files = s.map(t => + t match { + case JString(file) => file + case _ => "" + } + )) + case JField("engineFactory", JString(engineFactory)) => + enginemanifest.copy(engineFactory = engineFactory) + case _ => enginemanifest + } + } + }, + { + case enginemanifest: EngineManifest => + JObject( + JField("id", JString(enginemanifest.id)) :: + JField("version", JString(enginemanifest.version)) :: + JField("name", JString(enginemanifest.name)) :: + JField("description", + enginemanifest.description.map( + x => JString(x)).getOrElse(JNothing)) :: + JField("files", + JArray(enginemanifest.files.map(x => JString(x)).toList)) :: + JField("engineFactory", JString(enginemanifest.engineFactory)) :: + Nil) + } +)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala new file mode 100644 index 0000000..aa7224c --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EntityMap.scala @@ -0,0 +1,98 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.Experimental + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +/** + * :: Experimental :: + */ +@Experimental +class EntityIdIxMap(val idToIx: BiMap[String, Long]) extends Serializable { + + val ixToId: BiMap[Long, String] = idToIx.inverse + + def apply(id: String): Long = idToIx(id) + + def apply(ix: Long): String = ixToId(ix) + + def contains(id: String): Boolean = idToIx.contains(id) + + def contains(ix: Long): Boolean = ixToId.contains(ix) + + def get(id: String): Option[Long] = idToIx.get(id) + + def get(ix: Long): Option[String] = ixToId.get(ix) + + def getOrElse(id: String, default: => Long): Long = + idToIx.getOrElse(id, default) + + def getOrElse(ix: Long, default: => String): String = + ixToId.getOrElse(ix, default) + + def toMap: Map[String, Long] = idToIx.toMap + + def size: Long = idToIx.size + + def take(n: Int): EntityIdIxMap = new EntityIdIxMap(idToIx.take(n)) + + override def toString: String = idToIx.toString +} + +/** :: Experimental :: */ +@Experimental +object EntityIdIxMap { + def apply(keys: RDD[String]): EntityIdIxMap = { + new EntityIdIxMap(BiMap.stringLong(keys)) + } +} + +/** :: Experimental :: */ +@Experimental +class EntityMap[A](val idToData: Map[String, A], + override val idToIx: BiMap[String, Long]) extends EntityIdIxMap(idToIx) { + + def this(idToData: Map[String, A]) = this( + idToData, + BiMap.stringLong(idToData.keySet) + ) + + def data(id: String): A = idToData(id) + + def data(ix: Long): A = idToData(ixToId(ix)) + + def getData(id: String): Option[A] = idToData.get(id) + + def getData(ix: Long): Option[A] = idToData.get(ixToId(ix)) + + def getOrElseData(id: String, default: => A): A = + getData(id).getOrElse(default) + + def getOrElseData(ix: Long, default: => A): A = + getData(ix).getOrElse(default) + + override def take(n: Int): EntityMap[A] = { + val newIdToIx = idToIx.take(n) + new EntityMap[A](idToData.filterKeys(newIdToIx.contains(_)), newIdToIx) + } + + override def toString: String = { + s"idToData: ${idToData.toString} " + s"idToix: ${idToIx.toString}" + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala new file mode 100644 index 0000000..a58e642 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala @@ -0,0 +1,135 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import com.github.nscala_time.time.Imports._ +import org.apache.predictionio.annotation.DeveloperApi +import org.json4s._ + +/** :: DeveloperApi :: + * Stores meta information for each evaluation instance. + * + * @param id Instance ID. + * @param status Status of this instance. + * @param startTime Start time of this instance. + * @param endTime End time of this instance. + * @param evaluationClass Evaluation class name of this instance. + * @param engineParamsGeneratorClass Engine parameters generator class name of this instance. + * @param batch Batch label of this instance. + * @param env The environment in which this instance was created. + * @param evaluatorResults Results of the evaluator. + * @param evaluatorResultsHTML HTML results of the evaluator. + * @param evaluatorResultsJSON JSON results of the evaluator. + * @group Meta Data + */ +@DeveloperApi +case class EvaluationInstance( + id: String = "", + status: String = "", + startTime: DateTime = DateTime.now, + endTime: DateTime = DateTime.now, + evaluationClass: String = "", + engineParamsGeneratorClass: String = "", + batch: String = "", + env: Map[String, String] = Map(), + sparkConf: Map[String, String] = Map(), + evaluatorResults: String = "", + evaluatorResultsHTML: String = "", + evaluatorResultsJSON: String = "") + +/** :: DeveloperApi :: + * Base trait of the [[EvaluationInstance]] data access object + * + * @group Meta Data + */ +@DeveloperApi +trait EvaluationInstances { + /** Insert a new [[EvaluationInstance]] */ + def insert(i: EvaluationInstance): String + + /** Get an [[EvaluationInstance]] by ID */ + def get(id: String): Option[EvaluationInstance] + + /** Get all [[EvaluationInstances]] */ + def getAll: Seq[EvaluationInstance] + + /** Get instances that are produced by evaluation and have run to completion, + * reverse sorted by the start time + */ + def getCompleted: Seq[EvaluationInstance] + + /** Update an [[EvaluationInstance]] */ + def update(i: EvaluationInstance): Unit + + /** Delete an [[EvaluationInstance]] */ + def delete(id: String): Unit +} + +/** :: DeveloperApi :: + * JSON4S serializer for [[EvaluationInstance]] + * + * @group Meta Data + */ +class EvaluationInstanceSerializer extends CustomSerializer[EvaluationInstance]( + format => ({ + case JObject(fields) => + implicit val formats = DefaultFormats + fields.foldLeft(EvaluationInstance()) { case (i, field) => + field match { + case JField("id", JString(id)) => i.copy(id = id) + case JField("status", JString(status)) => i.copy(status = status) + case JField("startTime", JString(startTime)) => + i.copy(startTime = Utils.stringToDateTime(startTime)) + case JField("endTime", JString(endTime)) => + i.copy(endTime = Utils.stringToDateTime(endTime)) + case JField("evaluationClass", JString(evaluationClass)) => + i.copy(evaluationClass = evaluationClass) + case JField("engineParamsGeneratorClass", JString(engineParamsGeneratorClass)) => + i.copy(engineParamsGeneratorClass = engineParamsGeneratorClass) + case JField("batch", JString(batch)) => i.copy(batch = batch) + case JField("env", env) => + i.copy(env = Extraction.extract[Map[String, String]](env)) + case JField("sparkConf", sparkConf) => + i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf)) + case JField("evaluatorResults", JString(evaluatorResults)) => + i.copy(evaluatorResults = evaluatorResults) + case JField("evaluatorResultsHTML", JString(evaluatorResultsHTML)) => + i.copy(evaluatorResultsHTML = evaluatorResultsHTML) + case JField("evaluatorResultsJSON", JString(evaluatorResultsJSON)) => + i.copy(evaluatorResultsJSON = evaluatorResultsJSON) + case _ => i + } + } + }, { + case i: EvaluationInstance => + JObject( + JField("id", JString(i.id)) :: + JField("status", JString(i.status)) :: + JField("startTime", JString(i.startTime.toString)) :: + JField("endTime", JString(i.endTime.toString)) :: + JField("evaluationClass", JString(i.evaluationClass)) :: + JField("engineParamsGeneratorClass", JString(i.engineParamsGeneratorClass)) :: + JField("batch", JString(i.batch)) :: + JField("env", Extraction.decompose(i.env)(DefaultFormats)) :: + JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) :: + JField("evaluatorResults", JString(i.evaluatorResults)) :: + JField("evaluatorResultsHTML", JString(i.evaluatorResultsHTML)) :: + JField("evaluatorResultsJSON", JString(i.evaluatorResultsJSON)) :: + Nil + ) + } + ) +)
