http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala b/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala new file mode 100644 index 0000000..2d5939f --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala @@ -0,0 +1,61 @@ +package org.apache.predictionio.workflow + +import org.apache.predictionio.controller._ + +import org.scalatest.FunSuite +import org.scalatest.Matchers._ + +class EvaluationWorkflowSuite extends FunSuite with SharedSparkContext { + + test("Evaluation return best engine params, simple result type: Double") { + val engine = new Engine1() + val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2)) + val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) + val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) + val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2)) + val engineParamsList = Seq(ep0, ep1, ep2, ep3) + + val evaluator = MetricEvaluator(new Metric0()) + + object Eval extends Evaluation { + engineEvaluator = (new Engine1(), MetricEvaluator(new Metric0())) + } + + val result = EvaluationWorkflow.runEvaluation( + sc, + Eval, + engine, + engineParamsList, + evaluator, + WorkflowParams()) + + result.bestScore.score shouldBe 0.3 + result.bestEngineParams shouldBe ep1 + } + + test("Evaluation return best engine params, complex result type") { + val engine = new Engine1() + val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2)) + val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) + val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) + val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2)) + val engineParamsList = Seq(ep0, ep1, ep2, ep3) + + val evaluator = MetricEvaluator(new Metric1()) + + object Eval extends Evaluation { + engineEvaluator = (new Engine1(), MetricEvaluator(new Metric1())) + } + + val result = EvaluationWorkflow.runEvaluation( + sc, + Eval, + engine, + engineParamsList, + evaluator, + WorkflowParams()) + + result.bestScore.score shouldBe Metric1.Result(0, 0.3) + result.bestEngineParams shouldBe ep1 + } +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala b/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala new file mode 100644 index 0000000..217f416 --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala @@ -0,0 +1,383 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.workflow + +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.Params +import org.apache.predictionio.controller.Utils +import org.json4s.CustomSerializer +import org.json4s.JsonAST.JField +import org.json4s.JsonAST.JObject +import org.json4s.JsonAST.JString +import org.json4s.MappingException +import org.json4s.native.JsonMethods.compact +import org.json4s.native.JsonMethods.render +import org.scalatest.FunSuite +import org.scalatest.Matchers + +class JsonExtractorSuite extends FunSuite with Matchers { + + test("Extract Scala object using option Json4sNative works with optional and default value " + + "provided") { + + val json = """{"string": "query string", "optional": "optional string", "default": "d"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", Some("optional string"), "d")) + } + + test("Extract Scala object using option Json4sNative works with no optional and no default " + + "value provided") { + + val json = """{"string": "query string"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", None, "default")) + } + + test("Extract Scala object using option Json4sNative works with null optional and null default" + + " value") { + + val json = """{"string": "query string", "optional": null, "default": null}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", None, "default")) + } + + test("Extract Scala object using option Both works with optional and default value provided") { + + val json = """{"string": "query string", "optional": "optional string", "default": "d"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", Some("optional string"), "d")) + } + + test("Extract Scala object using option Both works with no optional and no default value " + + "provided") { + + val json = """{"string": "query string"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", None, "default")) + } + + test("Extract Scala object using option Both works with null optional and null default value") { + + val json = """{"string": "query string", "optional": null, "default": null}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", None, "default")) + } + + test("Extract Scala object using option Gson should not get default value and optional none" + + " value") { + + val json = """{"string": "query string"}""" + val query = JsonExtractor.extract( + JsonExtractorOption.Gson, + json, + classOf[ScalaQuery]) + + query should be (ScalaQuery("query string", null, null)) + } + + test("Extract Scala object using option Gson should throw an exception with optional " + + "value provided") { + + val json = """{"string": "query string", "optional": "o", "default": "d"}""" + intercept[RuntimeException] { + JsonExtractor.extract( + JsonExtractorOption.Gson, + json, + classOf[ScalaQuery]) + } + } + + test("Extract Java object using option Gson works") { + + val json = """{"q": "query string"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Gson, + json, + classOf[JavaQuery]) + + query should be (new JavaQuery("query string")) + } + + test("Extract Java object using option Both works") { + + val json = """{"q": "query string"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Both, + json, + classOf[JavaQuery]) + + query should be (new JavaQuery("query string")) + } + + test("Extract Java object using option Json4sNative should throw an exception") { + + val json = """{"q": "query string"}""" + + intercept[MappingException] { + JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[JavaQuery]) + } + } + + test("Extract Scala object using option Json4sNative with custom deserializer") { + val json = """{"string": "query string", "optional": "o", "default": "d"}""" + + val query = JsonExtractor.extract( + JsonExtractorOption.Json4sNative, + json, + classOf[ScalaQuery], + Utils.json4sDefaultFormats + new UpperCaseFormat + ) + + query should be(ScalaQuery("QUERY STRING", Some("O"), "D")) + } + + test("Extract Java object usingoption Gson with custom deserializer") { + val json = """{"q": "query string"}""" + + val query = JsonExtractor.extract( + extractorOption = JsonExtractorOption.Gson, + json = json, + clazz = classOf[JavaQuery], + gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory) + ) + + query should be(new JavaQuery("QUERY STRING")) + } + + test("Java object to JValue using option Both works") { + val query = new JavaQuery("query string") + val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query) + + compact(render(jValue)) should be ("""{"q":"query string"}""") + } + + test("Java object to JValue using option Gson works") { + val query = new JavaQuery("query string") + val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query) + + compact(render(jValue)) should be ("""{"q":"query string"}""") + } + + test("Java object to JValue using option Json4sNative results in empty Json") { + val query = new JavaQuery("query string") + val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query) + + compact(render(jValue)) should be ("""{}""") + } + + test("Scala object to JValue using option Both works") { + val query = new ScalaQuery("query string", Some("option")) + val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query) + + compact(render(jValue)) should + be ("""{"string":"query string","optional":"option","default":"default"}""") + } + + test("Scala object to JValue using option Gson does not serialize optional") { + val query = new ScalaQuery("query string", Some("option")) + val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query) + + compact(render(jValue)) should + be ("""{"string":"query string","optional":{},"default":"default"}""") + } + + test("Scala object to JValue using option Json4sNative works") { + val query = new ScalaQuery("query string", Some("option")) + val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query) + + compact(render(jValue)) should + be ("""{"string":"query string","optional":"option","default":"default"}""") + } + + test("Scala object to JValue using option Json4sNative with custom serializer") { + val query = new ScalaQuery("query string", Some("option")) + val jValue = JsonExtractor.toJValue( + JsonExtractorOption.Json4sNative, + query, + Utils.json4sDefaultFormats + new UpperCaseFormat + ) + + compact(render(jValue)) should + be ("""{"string":"QUERY STRING","optional":"OPTION","default":"DEFAULT"}""") + } + + test("Java object to JValue using option Gson with custom serializer") { + val query = new JavaQuery("query string") + val jValue = JsonExtractor.toJValue( + extractorOption = JsonExtractorOption.Gson, + o = query, + gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory) + ) + + compact(render(jValue)) should be ("""{"q":"QUERY STRING"}""") + } + + test("Java Param to Json using option Both") { + val param = ("algo", new JavaParams("parameter")) + val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param) + + json should be ("""{"algo":{"p":"parameter"}}""") + } + + test("Java Param to Json using option Gson") { + val param = ("algo", new JavaParams("parameter")) + val json = JsonExtractor.paramToJson(JsonExtractorOption.Gson, param) + + json should be ("""{"algo":{"p":"parameter"}}""") + } + + test("Scala Param to Json using option Both") { + val param = ("algo", AlgorithmParams("parameter")) + val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param) + + json should be ("""{"algo":{"a":"parameter"}}""") + } + + test("Scala Param to Json using option Json4sNative") { + val param = ("algo", AlgorithmParams("parameter")) + val json = JsonExtractor.paramToJson(JsonExtractorOption.Json4sNative, param) + + json should be ("""{"algo":{"a":"parameter"}}""") + } + + test("Java Params to Json using option Both") { + val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2"))) + val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params) + + json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""") + } + + test("Java Params to Json using option Gson") { + val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2"))) + val json = JsonExtractor.paramsToJson(JsonExtractorOption.Gson, params) + + json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""") + } + + test("Scala Params to Json using option Both") { + val params = + Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2"))) + val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params) + + json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats)) + } + + test("Scala Params to Json using option Json4sNative") { + val params = + Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2"))) + val json = JsonExtractor.paramsToJson(JsonExtractorOption.Json4sNative, params) + + json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats)) + } + + test("Mixed Java and Scala Params to Json using option Both") { + val params = + Seq(("scala", AlgorithmParams("parameter")), ("java", new JavaParams("parameter2"))) + val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params) + + json should be ("""[{"scala":{"a":"parameter"}},{"java":{"p":"parameter2"}}]""") + } + + test("Serializing Scala EngineParams works using option Json4sNative") { + val ep = new EngineParams( + dataSourceParams = ("ds", DataSourceParams("dsp")), + algorithmParamsList = Seq(("a0", AlgorithmParams("ap")))) + + val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Json4sNative, ep) + + json should be ( + """{"dataSourceParams":{"ds":{"a":"dsp"}},"preparatorParams":{"":{}},""" + + """"algorithmParamsList":[{"a0":{"a":"ap"}}],"servingParams":{"":{}}}""") + } + + test("Serializing Java EngineParams works using option Gson") { + val ep = new EngineParams( + dataSourceParams = ("ds", new JavaParams("dsp")), + algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2")))) + + val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Gson, ep) + + json should be ( + """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" + + """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""") + } + + test("Serializing Java EngineParams works using option Both") { + val ep = new EngineParams( + dataSourceParams = ("ds", new JavaParams("dsp")), + algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2")))) + + val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Both, ep) + + json should be ( + """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" + + """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""") + } +} + +private case class AlgorithmParams(a: String) extends Params + +private case class DataSourceParams(a: String) extends Params + +private case class ScalaQuery(string: String, optional: Option[String], default: String = "default") + +private class UpperCaseFormat extends CustomSerializer[ScalaQuery](format => ( { + case JObject(JField("string", JString(string)) :: + JField("optional", JString(optional)) :: + JField("default", JString(default)) :: + Nil) => ScalaQuery(string.toUpperCase, Some(optional.toUpperCase), default.toUpperCase) +}, { + case x: ScalaQuery => + JObject( + JField("string", JString(x.string.toUpperCase)), + JField("optional", JString(x.optional.get.toUpperCase)), + JField("default", JString(x.default.toUpperCase))) +})) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/Utils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/Utils.scala b/data/src/main/scala/io/prediction/data/Utils.scala deleted file mode 100644 index 78b71cc..0000000 --- a/data/src/main/scala/io/prediction/data/Utils.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data - -import org.joda.time.DateTime -import org.joda.time.format.ISODateTimeFormat - -import java.lang.IllegalArgumentException - -private[prediction] object Utils { - - // use dateTime() for strict ISO8601 format - val dateTimeFormatter = ISODateTimeFormat.dateTime().withOffsetParsed() - - val dateTimeNoMillisFormatter = - ISODateTimeFormat.dateTimeNoMillis().withOffsetParsed() - - def stringToDateTime(dt: String): DateTime = { - // We accept two formats. - // 1. "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" - // 2. "yyyy-MM-dd'T'HH:mm:ssZZ" - // The first one also takes milliseconds into account. - try { - // formatting for "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" - dateTimeFormatter.parseDateTime(dt) - } catch { - case e: IllegalArgumentException => { - // handle when the datetime string doesn't specify milliseconds. - dateTimeNoMillisFormatter.parseDateTime(dt) - } - } - } - - def dateTimeToString(dt: DateTime): String = dateTimeFormatter.print(dt) - // dt.toString - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Common.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/Common.scala b/data/src/main/scala/io/prediction/data/api/Common.scala deleted file mode 100644 index 6681a1d..0000000 --- a/data/src/main/scala/io/prediction/data/api/Common.scala +++ /dev/null @@ -1,80 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.webhooks.ConnectorException -import io.prediction.data.storage.StorageException - -import spray.routing._ -import spray.routing.Directives._ -import spray.routing.Rejection -import spray.http.StatusCodes -import spray.http.StatusCode -import spray.httpx.Json4sSupport - -import org.json4s.Formats -import org.json4s.DefaultFormats - -object Common { - - object Json4sProtocol extends Json4sSupport { - implicit def json4sFormats: Formats = DefaultFormats - } - - import Json4sProtocol._ - - val rejectionHandler = RejectionHandler { - case MalformedRequestContentRejection(msg, _) :: _ => - complete(StatusCodes.BadRequest, Map("message" -> msg)) - case MissingQueryParamRejection(msg) :: _ => - complete(StatusCodes.NotFound, - Map("message" -> s"missing required query parameter ${msg}.")) - case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => { - val msg = cause match { - case AuthenticationFailedRejection.CredentialsRejected => - "Invalid accessKey." - case AuthenticationFailedRejection.CredentialsMissing => - "Missing accessKey." - } - complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg)) - } - case ChannelRejection(msg) :: _ => - complete(StatusCodes.Unauthorized, Map("message" -> msg)) - case NonExistentAppRejection(msg) :: _ => - complete(StatusCodes.Unauthorized, Map("message" -> msg)) - } - - val exceptionHandler = ExceptionHandler { - case e: ConnectorException => { - val msg = s"${e.getMessage()}" - complete(StatusCodes.BadRequest, Map("message" -> msg)) - } - case e: StorageException => { - val msg = s"${e.getMessage()}" - complete(StatusCodes.InternalServerError, Map("message" -> msg)) - } - case e: Exception => { - val msg = s"${e.getMessage()}" - complete(StatusCodes.InternalServerError, Map("message" -> msg)) - } - } -} - -/** invalid channel */ -case class ChannelRejection(msg: String) extends Rejection - -/** the app doesn't exist */ -case class NonExistentAppRejection(msg: String) extends Rejection http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventInfo.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/EventInfo.scala b/data/src/main/scala/io/prediction/data/api/EventInfo.scala deleted file mode 100644 index 1e324c2..0000000 --- a/data/src/main/scala/io/prediction/data/api/EventInfo.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.storage.Event - -case class EventInfo( - appId: Int, - channelId: Option[Int], - event: Event) - http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServer.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/EventServer.scala b/data/src/main/scala/io/prediction/data/api/EventServer.scala deleted file mode 100644 index 139f964..0000000 --- a/data/src/main/scala/io/prediction/data/api/EventServer.scala +++ /dev/null @@ -1,640 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import akka.event.Logging -import sun.misc.BASE64Decoder - -import java.util.concurrent.TimeUnit - -import akka.actor._ -import akka.io.IO -import akka.pattern.ask -import akka.util.Timeout -import io.prediction.data.Utils -import io.prediction.data.storage.AccessKeys -import io.prediction.data.storage.Channels -import io.prediction.data.storage.DateTimeJson4sSupport -import io.prediction.data.storage.Event -import io.prediction.data.storage.EventJson4sSupport -import io.prediction.data.storage.BatchEventsJson4sSupport -import io.prediction.data.storage.LEvents -import io.prediction.data.storage.Storage -import org.json4s.DefaultFormats -import org.json4s.Formats -import org.json4s.JObject -import org.json4s.native.JsonMethods.parse -import spray.can.Http -import spray.http.FormData -import spray.http.MediaTypes -import spray.http.StatusCodes -import spray.httpx.Json4sSupport -import spray.routing._ -import spray.routing.authentication.Authentication - -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.{Try, Success, Failure} - -class EventServiceActor( - val eventClient: LEvents, - val accessKeysClient: AccessKeys, - val channelsClient: Channels, - val config: EventServerConfig) extends HttpServiceActor { - - object Json4sProtocol extends Json4sSupport { - implicit def json4sFormats: Formats = DefaultFormats + - new EventJson4sSupport.APISerializer + - new BatchEventsJson4sSupport.APISerializer + - // NOTE: don't use Json4s JodaTimeSerializers since it has issues, - // some format not converted, or timezone not correct - new DateTimeJson4sSupport.Serializer - } - - - val MaxNumberOfEventsPerBatchRequest = 50 - - val logger = Logging(context.system, this) - - // we use the enclosing ActorContext's or ActorSystem's dispatcher for our - // Futures - implicit def executionContext: ExecutionContext = context.dispatcher - - implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - val rejectionHandler = Common.rejectionHandler - - val jsonPath = """(.+)\.json$""".r - val formPath = """(.+)\.form$""".r - - val pluginContext = EventServerPluginContext(logger) - - private lazy val base64Decoder = new BASE64Decoder - - case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String]) - - /* with accessKey in query/header, return appId if succeed */ - def withAccessKey: RequestContext => Future[Authentication[AuthData]] = { - ctx: RequestContext => - val accessKeyParamOpt = ctx.request.uri.query.get("accessKey") - val channelParamOpt = ctx.request.uri.query.get("channel") - Future { - // with accessKey in query, return appId if succeed - accessKeyParamOpt.map { accessKeyParam => - accessKeysClient.get(accessKeyParam).map { k => - channelParamOpt.map { ch => - val channelMap = - channelsClient.getByAppid(k.appid) - .map(c => (c.name, c.id)).toMap - if (channelMap.contains(ch)) { - Right(AuthData(k.appid, Some(channelMap(ch)), k.events)) - } else { - Left(ChannelRejection(s"Invalid channel '$ch'.")) - } - }.getOrElse{ - Right(AuthData(k.appid, None, k.events)) - } - }.getOrElse(FailedAuth) - }.getOrElse { - // with accessKey in header, return appId if succeed - ctx.request.headers.find(_.name == "Authorization").map { authHeader â - authHeader.value.split("Basic ") match { - case Array(_, value) â - val appAccessKey = - new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0) - accessKeysClient.get(appAccessKey) match { - case Some(k) â Right(AuthData(k.appid, None, k.events)) - case None â FailedAuth - } - - case _ â FailedAuth - } - }.getOrElse(MissedAuth) - } - } - } - - private val FailedAuth = Left( - AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, List() - ) - ) - - private val MissedAuth = Left( - AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsMissing, List() - ) - ) - - lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor") - lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor") - - val route: Route = - pathSingleSlash { - import Json4sProtocol._ - - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete(Map("status" -> "alive")) - } - } - } ~ - path("plugins.json") { - import Json4sProtocol._ - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete { - Map("plugins" -> Map( - "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) => - n -> Map( - "name" -> p.pluginName, - "description" -> p.pluginDescription, - "class" -> p.getClass.getName) - }, - "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) => - n -> Map( - "name" -> p.pluginName, - "description" -> p.pluginDescription, - "class" -> p.getClass.getName) - } - )) - } - } - } - } ~ - path("plugins" / Segments) { segments => - get { - handleExceptions(Common.exceptionHandler) { - authenticate(withAccessKey) { authData => - respondWithMediaType(MediaTypes.`application/json`) { - complete { - val pluginArgs = segments.drop(2) - val pluginType = segments(0) - val pluginName = segments(1) - pluginType match { - case EventServerPlugin.inputBlocker => - pluginContext.inputBlockers(pluginName).handleREST( - authData.appId, - authData.channelId, - pluginArgs) - case EventServerPlugin.inputSniffer => - pluginsActorRef ? PluginsActor.HandleREST( - appId = authData.appId, - channelId = authData.channelId, - pluginName = pluginName, - pluginArgs = pluginArgs) map { - _.asInstanceOf[String] - } - } - } - } - } - } - } - } ~ - path("events" / jsonPath ) { eventId => - - import Json4sProtocol._ - - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - logger.debug(s"GET event ${eventId}.") - val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt => - eventOpt.map( event => - (StatusCodes.OK, event) - ).getOrElse( - (StatusCodes.NotFound, Map("message" -> "Not Found")) - ) - } - data - } - } - } - } - } - } ~ - delete { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - logger.debug(s"DELETE event ${eventId}.") - val data = eventClient.futureDelete(eventId, appId, channelId).map { found => - if (found) { - (StatusCodes.OK, Map("message" -> "Found")) - } else { - (StatusCodes.NotFound, Map("message" -> "Not Found")) - } - } - data - } - } - } - } - } - } - } ~ - path("events.json") { - - import Json4sProtocol._ - - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - val events = authData.events - entity(as[Event]) { event => - complete { - if (events.isEmpty || authData.events.contains(event.event)) { - pluginContext.inputBlockers.values.foreach( - _.process(EventInfo( - appId = appId, - channelId = channelId, - event = event), pluginContext)) - val data = eventClient.futureInsert(event, appId, channelId).map { id => - pluginsActorRef ! EventInfo( - appId = appId, - channelId = channelId, - event = event) - val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) - if (config.stats) { - statsActorRef ! Bookkeeping(appId, result._1, event) - } - result - } - data - } else { - (StatusCodes.Forbidden, - Map("message" -> s"${event.event} events are not allowed")) - } - } - } - } - } - } - } ~ - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - parameters( - 'startTime.as[Option[String]], - 'untilTime.as[Option[String]], - 'entityType.as[Option[String]], - 'entityId.as[Option[String]], - 'event.as[Option[String]], - 'targetEntityType.as[Option[String]], - 'targetEntityId.as[Option[String]], - 'limit.as[Option[Int]], - 'reversed.as[Option[Boolean]]) { - (startTimeStr, untilTimeStr, entityType, entityId, - eventName, // only support one event name - targetEntityType, targetEntityId, - limit, reversed) => - respondWithMediaType(MediaTypes.`application/json`) { - complete { - logger.debug( - s"GET events of appId=${appId} " + - s"st=${startTimeStr} ut=${untilTimeStr} " + - s"et=${entityType} eid=${entityId} " + - s"li=${limit} rev=${reversed} ") - - require(!((reversed == Some(true)) - && (entityType.isEmpty || entityId.isEmpty)), - "the parameter reversed can only be used with" + - " both entityType and entityId specified.") - - val parseTime = Future { - val startTime = startTimeStr.map(Utils.stringToDateTime(_)) - val untilTime = untilTimeStr.map(Utils.stringToDateTime(_)) - (startTime, untilTime) - } - - - parseTime.flatMap { case (startTime, untilTime) => - val data = eventClient.futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventName.map(List(_)), - targetEntityType = targetEntityType.map(Some(_)), - targetEntityId = targetEntityId.map(Some(_)), - limit = limit.orElse(Some(20)), - reversed = reversed) - .map { eventIter => - if (eventIter.hasNext) { - (StatusCodes.OK, eventIter.toArray) - } else { - (StatusCodes.NotFound, - Map("message" -> "Not Found")) - } - } - data - }.recover { - case e: Exception => - (StatusCodes.BadRequest, Map("message" -> s"${e}")) - } - } - } - } - } - } - } - } - } ~ - path("batch" / "events.json") { - - import Json4sProtocol._ - - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - val allowedEvents = authData.events - val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = { - case Success(event) => { - if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) { - pluginContext.inputBlockers.values.foreach( - _.process(EventInfo( - appId = appId, - channelId = channelId, - event = event), pluginContext)) - val data = eventClient.futureInsert(event, appId, channelId).map { id => - pluginsActorRef ! EventInfo( - appId = appId, - channelId = channelId, - event = event) - val status = StatusCodes.Created - val result = Map( - "status" -> status.intValue, - "eventId" -> s"${id}") - if (config.stats) { - statsActorRef ! Bookkeeping(appId, status, event) - } - result - }.recover { case exception => - Map( - "status" -> StatusCodes.InternalServerError.intValue, - "message" -> s"${exception.getMessage()}") - } - data - } else { - Future.successful(Map( - "status" -> StatusCodes.Forbidden.intValue, - "message" -> s"${event.event} events are not allowed")) - } - } - case Failure(exception) => { - Future.successful(Map( - "status" -> StatusCodes.BadRequest.intValue, - "message" -> s"${exception.getMessage()}")) - } - } - - entity(as[Seq[Try[Event]]]) { events => - complete { - if (events.length <= MaxNumberOfEventsPerBatchRequest) { - Future.traverse(events)(handleEvent) - } else { - (StatusCodes.BadRequest, - Map("message" -> (s"Batch request must have less than or equal to " + - s"${MaxNumberOfEventsPerBatchRequest} events"))) - } - } - } - } - } - } - } - } ~ - path("stats.json") { - - import Json4sProtocol._ - - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - respondWithMediaType(MediaTypes.`application/json`) { - if (config.stats) { - complete { - statsActorRef ? GetStats(appId) map { - _.asInstanceOf[Map[String, StatsSnapshot]] - } - } - } else { - complete( - StatusCodes.NotFound, - parse("""{"message": "To see stats, launch Event Server """ + - """with --stats argument."}""")) - } - } - } - } - } - } // stats.json get - } ~ - path("webhooks" / jsonPath ) { web => - import Json4sProtocol._ - - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - entity(as[JObject]) { jObj => - complete { - Webhooks.postJson( - appId = appId, - channelId = channelId, - web = web, - data = jObj, - eventClient = eventClient, - log = logger, - stats = config.stats, - statsActorRef = statsActorRef) - } - } - } - } - } - } - } ~ - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - Webhooks.getJson( - appId = appId, - channelId = channelId, - web = web, - log = logger) - } - } - } - } - } - } - } ~ - path("webhooks" / formPath ) { web => - post { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - entity(as[FormData]){ formData => - // logger.debug(formData.toString) - complete { - // respond with JSON - import Json4sProtocol._ - - Webhooks.postForm( - appId = appId, - channelId = channelId, - web = web, - data = formData, - eventClient = eventClient, - log = logger, - stats = config.stats, - statsActorRef = statsActorRef) - } - } - } - } - } - } - } ~ - get { - handleExceptions(Common.exceptionHandler) { - handleRejections(rejectionHandler) { - authenticate(withAccessKey) { authData => - val appId = authData.appId - val channelId = authData.channelId - respondWithMediaType(MediaTypes.`application/json`) { - complete { - // respond with JSON - import Json4sProtocol._ - - Webhooks.getForm( - appId = appId, - channelId = channelId, - web = web, - log = logger) - } - } - } - } - } - } - - } - - def receive: Actor.Receive = runRoute(route) -} - - - -/* message */ -case class StartServer(host: String, port: Int) - -class EventServerActor( - val eventClient: LEvents, - val accessKeysClient: AccessKeys, - val channelsClient: Channels, - val config: EventServerConfig) extends Actor with ActorLogging { - val child = context.actorOf( - Props(classOf[EventServiceActor], - eventClient, - accessKeysClient, - channelsClient, - config), - "EventServiceActor") - implicit val system = context.system - - def receive: Actor.Receive = { - case StartServer(host, portNum) => { - IO(Http) ! Http.Bind(child, interface = host, port = portNum) - } - case m: Http.Bound => log.info("Bound received. EventServer is ready.") - case m: Http.CommandFailed => log.error("Command failed.") - case _ => log.error("Unknown message.") - } -} - -case class EventServerConfig( - ip: String = "localhost", - port: Int = 7070, - plugins: String = "plugins", - stats: Boolean = false) - -object EventServer { - def createEventServer(config: EventServerConfig): Unit = { - implicit val system = ActorSystem("EventServerSystem") - - val eventClient = Storage.getLEvents() - val accessKeysClient = Storage.getMetaDataAccessKeys() - val channelsClient = Storage.getMetaDataChannels() - - val serverActor = system.actorOf( - Props( - classOf[EventServerActor], - eventClient, - accessKeysClient, - channelsClient, - config), - "EventServerActor" - ) - if (config.stats) system.actorOf(Props[StatsActor], "StatsActor") - system.actorOf(Props[PluginsActor], "PluginsActor") - serverActor ! StartServer(config.ip, config.port) - system.awaitTermination() - } -} - -object Run { - def main(args: Array[String]) { - EventServer.createEventServer(EventServerConfig( - ip = "0.0.0.0", - port = 7070)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala b/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala deleted file mode 100644 index a87fc84..0000000 --- a/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -trait EventServerPlugin { - val pluginName: String - val pluginDescription: String - val pluginType: String - - def start(context: EventServerPluginContext): Unit - - def process(eventInfo: EventInfo, context: EventServerPluginContext) - - def handleREST(appId: Int, channelId: Option[Int], arguments: Seq[String]): String -} - -object EventServerPlugin { - val inputBlocker = "inputblocker" - val inputSniffer = "inputsniffer" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala b/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala deleted file mode 100644 index 1d8d36e..0000000 --- a/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import java.util.ServiceLoader - -import akka.event.LoggingAdapter -import grizzled.slf4j.Logging - -import scala.collection.JavaConversions._ -import scala.collection.mutable - -class EventServerPluginContext( - val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]], - val log: LoggingAdapter) { - def inputBlockers: Map[String, EventServerPlugin] = - plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap - - def inputSniffers: Map[String, EventServerPlugin] = - plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap -} - -object EventServerPluginContext extends Logging { - def apply(log: LoggingAdapter): EventServerPluginContext = { - val plugins = mutable.Map[String, mutable.Map[String, EventServerPlugin]]( - EventServerPlugin.inputBlocker -> mutable.Map(), - EventServerPlugin.inputSniffer -> mutable.Map()) - val serviceLoader = ServiceLoader.load(classOf[EventServerPlugin]) - serviceLoader foreach { service => - plugins(service.pluginType) += service.pluginName -> service - } - new EventServerPluginContext( - plugins, - log) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/PluginsActor.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/PluginsActor.scala b/data/src/main/scala/io/prediction/data/api/PluginsActor.scala deleted file mode 100644 index 7883adf..0000000 --- a/data/src/main/scala/io/prediction/data/api/PluginsActor.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import akka.actor.Actor -import akka.event.Logging - -class PluginsActor() extends Actor { - implicit val system = context.system - val log = Logging(system, this) - - val pluginContext = EventServerPluginContext(log) - - def receive: PartialFunction[Any, Unit] = { - case e: EventInfo => - pluginContext.inputSniffers.values.foreach(_.process(e, pluginContext)) - case h: PluginsActor.HandleREST => - try { - sender() ! pluginContext.inputSniffers(h.pluginName).handleREST( - h.appId, - h.channelId, - h.pluginArgs) - } catch { - case e: Exception => - sender() ! s"""{"message":"${e.getMessage}"}""" - } - case _ => - log.error("Unknown message sent to Event Server input sniffer plugin host.") - } -} - -object PluginsActor { - case class HandleREST( - pluginName: String, - appId: Int, - channelId: Option[Int], - pluginArgs: Seq[String]) -} - http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Stats.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/Stats.scala b/data/src/main/scala/io/prediction/data/api/Stats.scala deleted file mode 100644 index ca5f05e..0000000 --- a/data/src/main/scala/io/prediction/data/api/Stats.scala +++ /dev/null @@ -1,79 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.storage.Event - -import spray.http.StatusCode - -import scala.collection.mutable.{ HashMap => MHashMap } -import scala.collection.mutable - -import com.github.nscala_time.time.Imports.DateTime - -case class EntityTypesEvent( - val entityType: String, - val targetEntityType: Option[String], - val event: String) { - - def this(e: Event) = this( - e.entityType, - e.targetEntityType, - e.event) -} - -case class KV[K, V](key: K, value: V) - -case class StatsSnapshot( - val startTime: DateTime, - val endTime: Option[DateTime], - val basic: Seq[KV[EntityTypesEvent, Long]], - val statusCode: Seq[KV[StatusCode, Long]] -) - - -class Stats(val startTime: DateTime) { - private[this] var _endTime: Option[DateTime] = None - var statusCodeCount = MHashMap[(Int, StatusCode), Long]().withDefaultValue(0L) - var eteCount = MHashMap[(Int, EntityTypesEvent), Long]().withDefaultValue(0L) - - def cutoff(endTime: DateTime) { - _endTime = Some(endTime) - } - - def update(appId: Int, statusCode: StatusCode, event: Event) { - statusCodeCount((appId, statusCode)) += 1 - eteCount((appId, new EntityTypesEvent(event))) += 1 - } - - def extractByAppId[K, V](appId: Int, m: mutable.Map[(Int, K), V]) - : Seq[KV[K, V]] = { - m - .toSeq - .flatMap { case (k, v) => - if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() } - } - } - - def get(appId: Int): StatsSnapshot = { - StatsSnapshot( - startTime, - _endTime, - extractByAppId(appId, eteCount), - extractByAppId(appId, statusCodeCount) - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/StatsActor.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/StatsActor.scala b/data/src/main/scala/io/prediction/data/api/StatsActor.scala deleted file mode 100644 index 857352f..0000000 --- a/data/src/main/scala/io/prediction/data/api/StatsActor.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.storage.Event - -import spray.http.StatusCode - -import akka.actor.Actor -import akka.event.Logging - -import com.github.nscala_time.time.Imports.DateTime - -/* message to StatsActor */ -case class Bookkeeping(val appId: Int, statusCode: StatusCode, event: Event) - -/* message to StatsActor */ -case class GetStats(val appId: Int) - -class StatsActor extends Actor { - implicit val system = context.system - val log = Logging(system, this) - - def getCurrent: DateTime = { - DateTime.now. - withMinuteOfHour(0). - withSecondOfMinute(0). - withMillisOfSecond(0) - } - - var longLiveStats = new Stats(DateTime.now) - var hourlyStats = new Stats(getCurrent) - - var prevHourlyStats = new Stats(getCurrent.minusHours(1)) - prevHourlyStats.cutoff(hourlyStats.startTime) - - def bookkeeping(appId: Int, statusCode: StatusCode, event: Event) { - val current = getCurrent - // If the current hour is different from the stats start time, we create - // another stats instance, and move the current to prev. - if (current != hourlyStats.startTime) { - prevHourlyStats = hourlyStats - prevHourlyStats.cutoff(current) - hourlyStats = new Stats(current) - } - - hourlyStats.update(appId, statusCode, event) - longLiveStats.update(appId, statusCode, event) - } - - def receive: Actor.Receive = { - case Bookkeeping(appId, statusCode, event) => - bookkeeping(appId, statusCode, event) - case GetStats(appId) => sender() ! Map( - "time" -> DateTime.now, - "currentHour" -> hourlyStats.get(appId), - "prevHour" -> prevHourlyStats.get(appId), - "longLive" -> longLiveStats.get(appId)) - case _ => log.error("Unknown message.") - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Webhooks.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/Webhooks.scala b/data/src/main/scala/io/prediction/data/api/Webhooks.scala deleted file mode 100644 index ff18888..0000000 --- a/data/src/main/scala/io/prediction/data/api/Webhooks.scala +++ /dev/null @@ -1,151 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.webhooks.JsonConnector -import io.prediction.data.webhooks.FormConnector -import io.prediction.data.webhooks.ConnectorUtil -import io.prediction.data.storage.Event -import io.prediction.data.storage.EventJson4sSupport -import io.prediction.data.storage.LEvents - -import spray.routing._ -import spray.routing.Directives._ -import spray.http.StatusCodes -import spray.http.StatusCode -import spray.http.FormData -import spray.httpx.Json4sSupport - -import org.json4s.Formats -import org.json4s.DefaultFormats -import org.json4s.JObject - -import akka.event.LoggingAdapter -import akka.actor.ActorSelection - -import scala.concurrent.{ExecutionContext, Future} - - -private[prediction] object Webhooks { - - def postJson( - appId: Int, - channelId: Option[Int], - web: String, - data: JObject, - eventClient: LEvents, - log: LoggingAdapter, - stats: Boolean, - statsActorRef: ActorSelection - )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { - - val eventFuture = Future { - WebhooksConnectors.json.get(web).map { connector => - ConnectorUtil.toEvent(connector, data) - } - } - - eventFuture.flatMap { eventOpt => - if (eventOpt.isEmpty) { - Future successful { - val message = s"webhooks connection for ${web} is not supported." - (StatusCodes.NotFound, Map("message" -> message)) - } - } else { - val event = eventOpt.get - val data = eventClient.futureInsert(event, appId, channelId).map { id => - val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) - - if (stats) { - statsActorRef ! Bookkeeping(appId, result._1, event) - } - result - } - data - } - } - } - - def getJson( - appId: Int, - channelId: Option[Int], - web: String, - log: LoggingAdapter - )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { - Future { - WebhooksConnectors.json.get(web).map { connector => - (StatusCodes.OK, Map("message" -> "Ok")) - }.getOrElse { - val message = s"webhooks connection for ${web} is not supported." - (StatusCodes.NotFound, Map("message" -> message)) - } - } - } - - def postForm( - appId: Int, - channelId: Option[Int], - web: String, - data: FormData, - eventClient: LEvents, - log: LoggingAdapter, - stats: Boolean, - statsActorRef: ActorSelection - )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { - val eventFuture = Future { - WebhooksConnectors.form.get(web).map { connector => - ConnectorUtil.toEvent(connector, data.fields.toMap) - } - } - - eventFuture.flatMap { eventOpt => - if (eventOpt.isEmpty) { - Future { - val message = s"webhooks connection for ${web} is not supported." - (StatusCodes.NotFound, Map("message" -> message)) - } - } else { - val event = eventOpt.get - val data = eventClient.futureInsert(event, appId, channelId).map { id => - val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) - - if (stats) { - statsActorRef ! Bookkeeping(appId, result._1, event) - } - result - } - data - } - } - } - - def getForm( - appId: Int, - channelId: Option[Int], - web: String, - log: LoggingAdapter - )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = { - Future { - WebhooksConnectors.form.get(web).map { connector => - (StatusCodes.OK, Map("message" -> "Ok")) - }.getOrElse { - val message = s"webhooks connection for ${web} is not supported." - (StatusCodes.NotFound, Map("message" -> message)) - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala b/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala deleted file mode 100644 index 97c9775..0000000 --- a/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.webhooks.JsonConnector -import io.prediction.data.webhooks.FormConnector - -import io.prediction.data.webhooks.segmentio.SegmentIOConnector -import io.prediction.data.webhooks.mailchimp.MailChimpConnector - -private[prediction] object WebhooksConnectors { - - val json: Map[String, JsonConnector] = Map( - "segmentio" -> SegmentIOConnector - ) - - val form: Map[String, FormConnector] = Map( - "mailchimp" -> MailChimpConnector - ) - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/package.scala b/data/src/main/scala/io/prediction/data/package.scala deleted file mode 100644 index afbe573..0000000 --- a/data/src/main/scala/io/prediction/data/package.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction - -/** Provides data access for PredictionIO and any engines running on top of - * PredictionIO - */ -package object data {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala b/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala deleted file mode 100644 index f197e78..0000000 --- a/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import java.security.SecureRandom - -import io.prediction.annotation.DeveloperApi -import org.apache.commons.codec.binary.Base64 - -/** :: DeveloperApi :: - * Stores mapping of access keys, app IDs, and lists of allowed event names - * - * @param key Access key - * @param appid App ID - * @param events List of allowed events for this particular app key - * @group Meta Data - */ -@DeveloperApi -case class AccessKey( - key: String, - appid: Int, - events: Seq[String]) - -/** :: DeveloperApi :: - * Base trait of the [[AccessKey]] data access object - * - * @group Meta Data - */ -@DeveloperApi -trait AccessKeys { - /** Insert a new [[AccessKey]]. If the key field is empty, a key will be - * generated. - */ - def insert(k: AccessKey): Option[String] - - /** Get an [[AccessKey]] by key */ - def get(k: String): Option[AccessKey] - - /** Get all [[AccessKey]]s */ - def getAll(): Seq[AccessKey] - - /** Get all [[AccessKey]]s for a particular app ID */ - def getByAppid(appid: Int): Seq[AccessKey] - - /** Update an [[AccessKey]] */ - def update(k: AccessKey): Unit - - /** Delete an [[AccessKey]] */ - def delete(k: String): Unit - - /** Default implementation of key generation */ - def generateKey: String = { - val sr = SecureRandom.getInstanceStrong - val srBytes = Array.fill(48)(0.toByte) - sr.nextBytes(srBytes) - Base64.encodeBase64URLSafeString(srBytes) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Apps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/Apps.scala b/data/src/main/scala/io/prediction/data/storage/Apps.scala deleted file mode 100644 index 32343e1..0000000 --- a/data/src/main/scala/io/prediction/data/storage/Apps.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi - -/** :: DeveloperApi :: - * Stores mapping of app IDs and names - * - * @param id ID of the app. - * @param name Name of the app. - * @param description Long description of the app. - * @group Meta Data - */ -@DeveloperApi -case class App( - id: Int, - name: String, - description: Option[String]) - -/** :: DeveloperApi :: - * Base trait of the [[App]] data access object - * - * @group Meta Data - */ -@DeveloperApi -trait Apps { - /** Insert a new [[App]]. Returns a generated app ID if the supplied app ID is 0. */ - def insert(app: App): Option[Int] - - /** Get an [[App]] by app ID */ - def get(id: Int): Option[App] - - /** Get an [[App]] by app name */ - def getByName(name: String): Option[App] - - /** Get all [[App]]s */ - def getAll(): Seq[App] - - /** Update an [[App]] */ - def update(app: App): Unit - - /** Delete an [[App]] */ - def delete(id: Int): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/BiMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/BiMap.scala b/data/src/main/scala/io/prediction/data/storage/BiMap.scala deleted file mode 100644 index cbf3e12..0000000 --- a/data/src/main/scala/io/prediction/data/storage/BiMap.scala +++ /dev/null @@ -1,164 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import scala.collection.immutable.HashMap - -import org.apache.spark.rdd.RDD - -/** Immutable Bi-directional Map - * - */ -class BiMap[K, V] private[prediction] ( - private val m: Map[K, V], - private val i: Option[BiMap[V, K]] = None - ) extends Serializable { - - // NOTE: make inverse's inverse point back to current BiMap - val inverse: BiMap[V, K] = i.getOrElse { - val rev = m.map(_.swap) - require((rev.size == m.size), - s"Failed to create reversed map. Cannot have duplicated values.") - new BiMap(rev, Some(this)) - } - - def get(k: K): Option[V] = m.get(k) - - def getOrElse(k: K, default: => V): V = m.getOrElse(k, default) - - def contains(k: K): Boolean = m.contains(k) - - def apply(k: K): V = m.apply(k) - - /** Converts to a map. - * @return a map of type immutable.Map[K, V] - */ - def toMap: Map[K, V] = m - - /** Converts to a sequence. - * @return a sequence containing all elements of this map - */ - def toSeq: Seq[(K, V)] = m.toSeq - - def size: Int = m.size - - def take(n: Int): BiMap[K, V] = BiMap(m.take(n)) - - override def toString: String = m.toString -} - -object BiMap { - - def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x) - - /** Create a BiMap[String, Long] from a set of String. The Long index starts - * from 0. - * @param keys a set of String - * @return a String to Long BiMap - */ - def stringLong(keys: Set[String]): BiMap[String, Long] = { - val hm = HashMap(keys.toSeq.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*) - new BiMap(hm) - } - - /** Create a BiMap[String, Long] from an array of String. - * NOTE: the the array cannot have duplicated element. - * The Long index starts from 0. - * @param keys a set of String - * @return a String to Long BiMap - */ - def stringLong(keys: Array[String]): BiMap[String, Long] = { - val hm = HashMap(keys.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*) - new BiMap(hm) - } - - /** Create a BiMap[String, Long] from RDD[String]. The Long index starts - * from 0. - * @param keys RDD of String - * @return a String to Long BiMap - */ - def stringLong(keys: RDD[String]): BiMap[String, Long] = { - stringLong(keys.distinct.collect) - } - - /** Create a BiMap[String, Int] from a set of String. The Int index starts - * from 0. - * @param keys a set of String - * @return a String to Int BiMap - */ - def stringInt(keys: Set[String]): BiMap[String, Int] = { - val hm = HashMap(keys.toSeq.zipWithIndex : _*) - new BiMap(hm) - } - - /** Create a BiMap[String, Int] from an array of String. - * NOTE: the the array cannot have duplicated element. - * The Int index starts from 0. - * @param keys a set of String - * @return a String to Int BiMap - */ - def stringInt(keys: Array[String]): BiMap[String, Int] = { - val hm = HashMap(keys.zipWithIndex : _*) - new BiMap(hm) - } - - /** Create a BiMap[String, Int] from RDD[String]. The Int index starts - * from 0. - * @param keys RDD of String - * @return a String to Int BiMap - */ - def stringInt(keys: RDD[String]): BiMap[String, Int] = { - stringInt(keys.distinct.collect) - } - - private[this] def stringDoubleImpl(keys: Seq[String]) - : BiMap[String, Double] = { - val ki = keys.zipWithIndex.map(e => (e._1, e._2.toDouble)) - new BiMap(HashMap(ki : _*)) - } - - /** Create a BiMap[String, Double] from a set of String. The Double index - * starts from 0. - * @param keys a set of String - * @return a String to Double BiMap - */ - def stringDouble(keys: Set[String]): BiMap[String, Double] = { - // val hm = HashMap(keys.toSeq.zipWithIndex.map(_.toDouble) : _*) - // new BiMap(hm) - stringDoubleImpl(keys.toSeq) - } - - /** Create a BiMap[String, Double] from an array of String. - * NOTE: the the array cannot have duplicated element. - * The Double index starts from 0. - * @param keys a set of String - * @return a String to Double BiMap - */ - def stringDouble(keys: Array[String]): BiMap[String, Double] = { - // val hm = HashMap(keys.zipWithIndex.mapValues(_.toDouble) : _*) - // new BiMap(hm) - stringDoubleImpl(keys.toSeq) - } - - /** Create a BiMap[String, Double] from RDD[String]. The Double index starts - * from 0. - * @param keys RDD of String - * @return a String to Double BiMap - */ - def stringDouble(keys: RDD[String]): BiMap[String, Double] = { - stringDoubleImpl(keys.distinct.collect) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Channels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/Channels.scala b/data/src/main/scala/io/prediction/data/storage/Channels.scala deleted file mode 100644 index 3fa7aef..0000000 --- a/data/src/main/scala/io/prediction/data/storage/Channels.scala +++ /dev/null @@ -1,79 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi - -/** :: DeveloperApi :: - * Stores mapping of channel IDs, names and app ID - * - * @param id ID of the channel - * @param name Name of the channel (must be unique within the same app) - * @param appid ID of the app which this channel belongs to - * @group Meta Data - */ -@DeveloperApi -case class Channel( - id: Int, - name: String, // must be unique within the same app - appid: Int -) { - require(Channel.isValidName(name), - "Invalid channel name: ${name}. ${Channel.nameConstraint}") -} - -/** :: DeveloperApi :: - * Companion object of [[Channel]] - * - * @group Meta Data - */ -@DeveloperApi -object Channel { - /** Examine whether the supplied channel name is valid. A valid channel name - * must consists of 1 to 16 alphanumeric and '-' characters. - * - * @param s Channel name to examine - * @return true if channel name is valid, false otherwise - */ - def isValidName(s: String): Boolean = { - // note: update channelNameConstraint if this rule is changed - s.matches("^[a-zA-Z0-9-]{1,16}$") - } - - /** For consistent error message display */ - val nameConstraint: String = - "Only alphanumeric and - characters are allowed and max length is 16." -} - -/** :: DeveloperApi :: - * Base trait of the [[Channel]] data access object - * - * @group Meta Data - */ -@DeveloperApi -trait Channels { - /** Insert a new [[Channel]]. Returns a generated channel ID if original ID is 0. */ - def insert(channel: Channel): Option[Int] - - /** Get a [[Channel]] by channel ID */ - def get(id: Int): Option[Channel] - - /** Get all [[Channel]] by app ID */ - def getByAppid(appid: Int): Seq[Channel] - - /** Delete a [[Channel]] */ - def delete(id: Int): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/DataMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/DataMap.scala b/data/src/main/scala/io/prediction/data/storage/DataMap.scala deleted file mode 100644 index 91a0ba5..0000000 --- a/data/src/main/scala/io/prediction/data/storage/DataMap.scala +++ /dev/null @@ -1,241 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.json4s._ -import org.json4s.native.JsonMethods.parse - -import scala.collection.GenTraversableOnce -import scala.collection.JavaConversions - -/** Exception class for [[DataMap]] - * - * @group Event Data - */ -case class DataMapException(msg: String, cause: Exception) - extends Exception(msg, cause) { - def this(msg: String) = this(msg, null) -} - -/** A DataMap stores properties of the event or entity. Internally it is a Map - * whose keys are property names and values are corresponding JSON values - * respectively. Use the [[get]] method to retrieve the value of a mandatory - * property or use [[getOpt]] to retrieve the value of an optional property. - * - * @param fields Map of property name to JValue - * @group Event Data - */ -class DataMap ( - val fields: Map[String, JValue] -) extends Serializable { - @transient lazy implicit private val formats = DefaultFormats + - new DateTimeJson4sSupport.Serializer - - /** Check the existence of a required property name. Throw an exception if - * it does not exist. - * - * @param name The property name - */ - def require(name: String): Unit = { - if (!fields.contains(name)) { - throw new DataMapException(s"The field $name is required.") - } - } - - /** Check if this DataMap contains a specific property. - * - * @param name The property name - * @return Return true if the property exists, else false. - */ - def contains(name: String): Boolean = { - fields.contains(name) - } - - /** Get the value of a mandatory property. Exception is thrown if the property - * does not exist. - * - * @tparam T The type of the property value - * @param name The property name - * @return Return the property value of type T - */ - def get[T: Manifest](name: String): T = { - require(name) - fields(name) match { - case JNull => throw new DataMapException( - s"The required field $name cannot be null.") - case x: JValue => x.extract[T] - } - } - - /** Get the value of an optional property. Return None if the property does - * not exist. - * - * @tparam T The type of the property value - * @param name The property name - * @return Return the property value of type Option[T] - */ - def getOpt[T: Manifest](name: String): Option[T] = { - // either the field doesn't exist or its value is null - fields.get(name).flatMap(_.extract[Option[T]]) - } - - /** Get the value of an optional property. Return default value if the - * property does not exist. - * - * @tparam T The type of the property value - * @param name The property name - * @param default The default property value of type T - * @return Return the property value of type T - */ - def getOrElse[T: Manifest](name: String, default: T): T = { - getOpt[T](name).getOrElse(default) - } - - /** Java-friendly method for getting the value of a property. Return null if the - * property does not exist. - * - * @tparam T The type of the property value - * @param name The property name - * @param clazz The class of the type of the property value - * @return Return the property value of type T - */ - def get[T](name: String, clazz: java.lang.Class[T]): T = { - val manifest = new Manifest[T] { - override def erasure: Class[_] = clazz - override def runtimeClass: Class[_] = clazz - } - - fields.get(name) match { - case None => null.asInstanceOf[T] - case Some(JNull) => null.asInstanceOf[T] - case Some(x) => x.extract[T](formats, manifest) - } - } - - /** Java-friendly method for getting a list of values of a property. Return null if the - * property does not exist. - * - * @param name The property name - * @return Return the list of property values - */ - def getStringList(name: String): java.util.List[String] = { - fields.get(name) match { - case None => null - case Some(JNull) => null - case Some(x) => - JavaConversions.seqAsJavaList(x.extract[List[String]](formats, manifest[List[String]])) - } - } - - /** Return a new DataMap with elements containing elements from the left hand - * side operand followed by elements from the right hand side operand. - * - * @param that Right hand side DataMap - * @return A new DataMap - */ - def ++ (that: DataMap): DataMap = DataMap(this.fields ++ that.fields) - - /** Creates a new DataMap from this DataMap by removing all elements of - * another collection. - * - * @param that A collection containing the removed property names - * @return A new DataMap - */ - def -- (that: GenTraversableOnce[String]): DataMap = - DataMap(this.fields -- that) - - /** Tests whether the DataMap is empty. - * - * @return true if the DataMap is empty, false otherwise. - */ - def isEmpty: Boolean = fields.isEmpty - - /** Collects all property names of this DataMap in a set. - * - * @return a set containing all property names of this DataMap. - */ - def keySet: Set[String] = this.fields.keySet - - /** Converts this DataMap to a List. - * - * @return a list of (property name, JSON value) tuples. - */ - def toList(): List[(String, JValue)] = fields.toList - - /** Converts this DataMap to a JObject. - * - * @return the JObject initialized by this DataMap. - */ - def toJObject(): JObject = JObject(toList()) - - /** Converts this DataMap to case class of type T. - * - * @return the object of type T. - */ - def extract[T: Manifest]: T = { - toJObject().extract[T] - } - - override - def toString: String = s"DataMap($fields)" - - override - def hashCode: Int = 41 + fields.hashCode - - override - def equals(other: Any): Boolean = other match { - case that: DataMap => that.canEqual(this) && this.fields.equals(that.fields) - case _ => false - } - - def canEqual(other: Any): Boolean = other.isInstanceOf[DataMap] -} - -/** Companion object of the [[DataMap]] class - * - * @group Event Data - */ -object DataMap { - /** Create an empty DataMap - * @return an empty DataMap - */ - def apply(): DataMap = new DataMap(Map[String, JValue]()) - - /** Create an DataMap from a Map of String to JValue - * @param fields a Map of String to JValue - * @return a new DataMap initialized by fields - */ - def apply(fields: Map[String, JValue]): DataMap = new DataMap(fields) - - /** Create an DataMap from a JObject - * @param jObj JObject - * @return a new DataMap initialized by a JObject - */ - def apply(jObj: JObject): DataMap = { - if (jObj == null) { - apply() - } else { - new DataMap(jObj.obj.toMap) - } - } - - /** Create an DataMap from a JSON String - * @param js JSON String. eg """{ "a": 1, "b": "foo" }""" - * @return a new DataMap initialized by a JSON string - */ - def apply(js: String): DataMap = apply(parse(js).asInstanceOf[JObject]) - -}
