http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala b/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala deleted file mode 100644 index 48f935a..0000000 --- a/data/src/main/scala/io/prediction/data/storage/DateTimeJson4sSupport.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi -import io.prediction.data.{Utils => DataUtils} -import org.joda.time.DateTime -import org.json4s._ - -/** :: DeveloperApi :: - * JSON4S serializer for Joda-Time - * - * @group Common - */ -@DeveloperApi -object DateTimeJson4sSupport { - - @transient lazy implicit val formats = DefaultFormats - - /** Serialize DateTime to JValue */ - def serializeToJValue: PartialFunction[Any, JValue] = { - case d: DateTime => JString(DataUtils.dateTimeToString(d)) - } - - /** Deserialize JValue to DateTime */ - def deserializeFromJValue: PartialFunction[JValue, DateTime] = { - case jv: JValue => DataUtils.stringToDateTime(jv.extract[String]) - } - - /** Custom JSON4S serializer for Joda-Time */ - class Serializer extends CustomSerializer[DateTime](format => ( - deserializeFromJValue, serializeToJValue)) - -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala b/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala deleted file mode 100644 index fdbb6ba..0000000 --- a/data/src/main/scala/io/prediction/data/storage/EngineInstances.scala +++ /dev/null @@ -1,177 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import com.github.nscala_time.time.Imports._ -import io.prediction.annotation.DeveloperApi -import org.json4s._ - -/** :: DeveloperApi :: - * Stores parameters, model, and other information for each engine instance - * - * @param id Engine instance ID. - * @param status Status of the engine instance. - * @param startTime Start time of the training/evaluation. - * @param endTime End time of the training/evaluation. - * @param engineId Engine ID of the instance. - * @param engineVersion Engine version of the instance. - * @param engineVariant Engine variant ID of the instance. - * @param engineFactory Engine factory class for the instance. - * @param batch A batch label of the engine instance. - * @param env The environment in which the instance was created. - * @param sparkConf Custom Spark configuration of the instance. - * @param dataSourceParams Data source parameters of the instance. - * @param preparatorParams Preparator parameters of the instance. - * @param algorithmsParams Algorithms parameters of the instance. - * @param servingParams Serving parameters of the instance. - * @group Meta Data - */ -@DeveloperApi -case class EngineInstance( - id: String, - status: String, - startTime: DateTime, - endTime: DateTime, - engineId: String, - engineVersion: String, - engineVariant: String, - engineFactory: String, - batch: String, - env: Map[String, String], - sparkConf: Map[String, String], - dataSourceParams: String, - preparatorParams: String, - algorithmsParams: String, - servingParams: String) - -/** :: DeveloperApi :: - * Base trait of the [[EngineInstance]] data access object - * - * @group Meta Data - */ -@DeveloperApi -trait EngineInstances { - /** Insert a new [[EngineInstance]] */ - def insert(i: EngineInstance): String - - /** Get an [[EngineInstance]] by ID */ - def get(id: String): Option[EngineInstance] - - /** Get all [[EngineInstance]]s */ - def getAll(): Seq[EngineInstance] - - /** Get an instance that has started training the latest and has trained to - * completion - */ - def getLatestCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Option[EngineInstance] - - /** Get all instances that has trained to completion */ - def getCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Seq[EngineInstance] - - /** Update an [[EngineInstance]] */ - def update(i: EngineInstance): Unit - - /** Delete an [[EngineInstance]] */ - def delete(id: String): Unit -} - -/** :: DeveloperApi :: - * JSON4S serializer for [[EngineInstance]] - * - * @group Meta Data - */ -@DeveloperApi -class EngineInstanceSerializer - extends CustomSerializer[EngineInstance]( - format => ({ - case JObject(fields) => - implicit val formats = DefaultFormats - val seed = EngineInstance( - id = "", - status = "", - startTime = DateTime.now, - endTime = DateTime.now, - engineId = "", - engineVersion = "", - engineVariant = "", - engineFactory = "", - batch = "", - env = Map(), - sparkConf = Map(), - dataSourceParams = "", - preparatorParams = "", - algorithmsParams = "", - servingParams = "") - fields.foldLeft(seed) { case (i, field) => - field match { - case JField("id", JString(id)) => i.copy(id = id) - case JField("status", JString(status)) => i.copy(status = status) - case JField("startTime", JString(startTime)) => - i.copy(startTime = Utils.stringToDateTime(startTime)) - case JField("endTime", JString(endTime)) => - i.copy(endTime = Utils.stringToDateTime(endTime)) - case JField("engineId", JString(engineId)) => - i.copy(engineId = engineId) - case JField("engineVersion", JString(engineVersion)) => - i.copy(engineVersion = engineVersion) - case JField("engineVariant", JString(engineVariant)) => - i.copy(engineVariant = engineVariant) - case JField("engineFactory", JString(engineFactory)) => - i.copy(engineFactory = engineFactory) - case JField("batch", JString(batch)) => i.copy(batch = batch) - case JField("env", env) => - i.copy(env = Extraction.extract[Map[String, String]](env)) - case JField("sparkConf", sparkConf) => - i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf)) - case JField("dataSourceParams", JString(dataSourceParams)) => - i.copy(dataSourceParams = dataSourceParams) - case JField("preparatorParams", JString(preparatorParams)) => - i.copy(preparatorParams = preparatorParams) - case JField("algorithmsParams", JString(algorithmsParams)) => - i.copy(algorithmsParams = algorithmsParams) - case JField("servingParams", JString(servingParams)) => - i.copy(servingParams = servingParams) - case _ => i - } - } - }, - { - case i: EngineInstance => - JObject( - JField("id", JString(i.id)) :: - JField("status", JString(i.status)) :: - JField("startTime", JString(i.startTime.toString)) :: - JField("endTime", JString(i.endTime.toString)) :: - JField("engineId", JString(i.engineId)) :: - JField("engineVersion", JString(i.engineVersion)) :: - JField("engineVariant", JString(i.engineVariant)) :: - JField("engineFactory", JString(i.engineFactory)) :: - JField("batch", JString(i.batch)) :: - JField("env", Extraction.decompose(i.env)(DefaultFormats)) :: - JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) :: - JField("dataSourceParams", JString(i.dataSourceParams)) :: - JField("preparatorParams", JString(i.preparatorParams)) :: - JField("algorithmsParams", JString(i.algorithmsParams)) :: - JField("servingParams", JString(i.servingParams)) :: - Nil) - } -)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala b/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala deleted file mode 100644 index d69ceae..0000000 --- a/data/src/main/scala/io/prediction/data/storage/EngineManifests.scala +++ /dev/null @@ -1,117 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi -import org.json4s._ - -/** :: DeveloperApi :: - * Provides a way to discover engines by ID and version in a distributed - * environment - * - * @param id Unique identifier of an engine. - * @param version Engine version string. - * @param name A short and descriptive name for the engine. - * @param description A long description of the engine. - * @param files Paths to engine files. - * @param engineFactory Engine's factory class name. - * @group Meta Data - */ -@DeveloperApi -case class EngineManifest( - id: String, - version: String, - name: String, - description: Option[String], - files: Seq[String], - engineFactory: String) - -/** :: DeveloperApi :: - * Base trait of the [[EngineManifest]] data access object - * - * @group Meta Data - */ -@DeveloperApi -trait EngineManifests { - /** Inserts an [[EngineManifest]] */ - def insert(engineManifest: EngineManifest): Unit - - /** Get an [[EngineManifest]] by its ID */ - def get(id: String, version: String): Option[EngineManifest] - - /** Get all [[EngineManifest]] */ - def getAll(): Seq[EngineManifest] - - /** Updates an [[EngineManifest]] */ - def update(engineInfo: EngineManifest, upsert: Boolean = false): Unit - - /** Delete an [[EngineManifest]] by its ID */ - def delete(id: String, version: String): Unit -} - -/** :: DeveloperApi :: - * JSON4S serializer for [[EngineManifest]] - * - * @group Meta Data - */ -@DeveloperApi -class EngineManifestSerializer - extends CustomSerializer[EngineManifest](format => ( - { - case JObject(fields) => - val seed = EngineManifest( - id = "", - version = "", - name = "", - description = None, - files = Nil, - engineFactory = "") - fields.foldLeft(seed) { case (enginemanifest, field) => - field match { - case JField("id", JString(id)) => enginemanifest.copy(id = id) - case JField("version", JString(version)) => - enginemanifest.copy(version = version) - case JField("name", JString(name)) => enginemanifest.copy(name = name) - case JField("description", JString(description)) => - enginemanifest.copy(description = Some(description)) - case JField("files", JArray(s)) => - enginemanifest.copy(files = s.map(t => - t match { - case JString(file) => file - case _ => "" - } - )) - case JField("engineFactory", JString(engineFactory)) => - enginemanifest.copy(engineFactory = engineFactory) - case _ => enginemanifest - } - } - }, - { - case enginemanifest: EngineManifest => - JObject( - JField("id", JString(enginemanifest.id)) :: - JField("version", JString(enginemanifest.version)) :: - JField("name", JString(enginemanifest.name)) :: - JField("description", - enginemanifest.description.map( - x => JString(x)).getOrElse(JNothing)) :: - JField("files", - JArray(enginemanifest.files.map(x => JString(x)).toList)) :: - JField("engineFactory", JString(enginemanifest.engineFactory)) :: - Nil) - } -)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EntityMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/EntityMap.scala b/data/src/main/scala/io/prediction/data/storage/EntityMap.scala deleted file mode 100644 index d9cd4c8..0000000 --- a/data/src/main/scala/io/prediction/data/storage/EntityMap.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prediction.data.storage - -import io.prediction.annotation.Experimental - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -/** - * :: Experimental :: - */ -@Experimental -class EntityIdIxMap(val idToIx: BiMap[String, Long]) extends Serializable { - - val ixToId: BiMap[Long, String] = idToIx.inverse - - def apply(id: String): Long = idToIx(id) - - def apply(ix: Long): String = ixToId(ix) - - def contains(id: String): Boolean = idToIx.contains(id) - - def contains(ix: Long): Boolean = ixToId.contains(ix) - - def get(id: String): Option[Long] = idToIx.get(id) - - def get(ix: Long): Option[String] = ixToId.get(ix) - - def getOrElse(id: String, default: => Long): Long = - idToIx.getOrElse(id, default) - - def getOrElse(ix: Long, default: => String): String = - ixToId.getOrElse(ix, default) - - def toMap: Map[String, Long] = idToIx.toMap - - def size: Long = idToIx.size - - def take(n: Int): EntityIdIxMap = new EntityIdIxMap(idToIx.take(n)) - - override def toString: String = idToIx.toString -} - -/** :: Experimental :: */ -@Experimental -object EntityIdIxMap { - def apply(keys: RDD[String]): EntityIdIxMap = { - new EntityIdIxMap(BiMap.stringLong(keys)) - } -} - -/** :: Experimental :: */ -@Experimental -class EntityMap[A](val idToData: Map[String, A], - override val idToIx: BiMap[String, Long]) extends EntityIdIxMap(idToIx) { - - def this(idToData: Map[String, A]) = this( - idToData, - BiMap.stringLong(idToData.keySet) - ) - - def data(id: String): A = idToData(id) - - def data(ix: Long): A = idToData(ixToId(ix)) - - def getData(id: String): Option[A] = idToData.get(id) - - def getData(ix: Long): Option[A] = idToData.get(ixToId(ix)) - - def getOrElseData(id: String, default: => A): A = - getData(id).getOrElse(default) - - def getOrElseData(ix: Long, default: => A): A = - getData(ix).getOrElse(default) - - override def take(n: Int): EntityMap[A] = { - val newIdToIx = idToIx.take(n) - new EntityMap[A](idToData.filterKeys(newIdToIx.contains(_)), newIdToIx) - } - - override def toString: String = { - s"idToData: ${idToData.toString} " + s"idToix: ${idToIx.toString}" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala b/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala deleted file mode 100644 index 0a7d502..0000000 --- a/data/src/main/scala/io/prediction/data/storage/EvaluationInstances.scala +++ /dev/null @@ -1,135 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import com.github.nscala_time.time.Imports._ -import io.prediction.annotation.DeveloperApi -import org.json4s._ - -/** :: DeveloperApi :: - * Stores meta information for each evaluation instance. - * - * @param id Instance ID. - * @param status Status of this instance. - * @param startTime Start time of this instance. - * @param endTime End time of this instance. - * @param evaluationClass Evaluation class name of this instance. - * @param engineParamsGeneratorClass Engine parameters generator class name of this instance. - * @param batch Batch label of this instance. - * @param env The environment in which this instance was created. - * @param evaluatorResults Results of the evaluator. - * @param evaluatorResultsHTML HTML results of the evaluator. - * @param evaluatorResultsJSON JSON results of the evaluator. - * @group Meta Data - */ -@DeveloperApi -case class EvaluationInstance( - id: String = "", - status: String = "", - startTime: DateTime = DateTime.now, - endTime: DateTime = DateTime.now, - evaluationClass: String = "", - engineParamsGeneratorClass: String = "", - batch: String = "", - env: Map[String, String] = Map(), - sparkConf: Map[String, String] = Map(), - evaluatorResults: String = "", - evaluatorResultsHTML: String = "", - evaluatorResultsJSON: String = "") - -/** :: DeveloperApi :: - * Base trait of the [[EvaluationInstance]] data access object - * - * @group Meta Data - */ -@DeveloperApi -trait EvaluationInstances { - /** Insert a new [[EvaluationInstance]] */ - def insert(i: EvaluationInstance): String - - /** Get an [[EvaluationInstance]] by ID */ - def get(id: String): Option[EvaluationInstance] - - /** Get all [[EvaluationInstances]] */ - def getAll: Seq[EvaluationInstance] - - /** Get instances that are produced by evaluation and have run to completion, - * reverse sorted by the start time - */ - def getCompleted: Seq[EvaluationInstance] - - /** Update an [[EvaluationInstance]] */ - def update(i: EvaluationInstance): Unit - - /** Delete an [[EvaluationInstance]] */ - def delete(id: String): Unit -} - -/** :: DeveloperApi :: - * JSON4S serializer for [[EvaluationInstance]] - * - * @group Meta Data - */ -class EvaluationInstanceSerializer extends CustomSerializer[EvaluationInstance]( - format => ({ - case JObject(fields) => - implicit val formats = DefaultFormats - fields.foldLeft(EvaluationInstance()) { case (i, field) => - field match { - case JField("id", JString(id)) => i.copy(id = id) - case JField("status", JString(status)) => i.copy(status = status) - case JField("startTime", JString(startTime)) => - i.copy(startTime = Utils.stringToDateTime(startTime)) - case JField("endTime", JString(endTime)) => - i.copy(endTime = Utils.stringToDateTime(endTime)) - case JField("evaluationClass", JString(evaluationClass)) => - i.copy(evaluationClass = evaluationClass) - case JField("engineParamsGeneratorClass", JString(engineParamsGeneratorClass)) => - i.copy(engineParamsGeneratorClass = engineParamsGeneratorClass) - case JField("batch", JString(batch)) => i.copy(batch = batch) - case JField("env", env) => - i.copy(env = Extraction.extract[Map[String, String]](env)) - case JField("sparkConf", sparkConf) => - i.copy(sparkConf = Extraction.extract[Map[String, String]](sparkConf)) - case JField("evaluatorResults", JString(evaluatorResults)) => - i.copy(evaluatorResults = evaluatorResults) - case JField("evaluatorResultsHTML", JString(evaluatorResultsHTML)) => - i.copy(evaluatorResultsHTML = evaluatorResultsHTML) - case JField("evaluatorResultsJSON", JString(evaluatorResultsJSON)) => - i.copy(evaluatorResultsJSON = evaluatorResultsJSON) - case _ => i - } - } - }, { - case i: EvaluationInstance => - JObject( - JField("id", JString(i.id)) :: - JField("status", JString(i.status)) :: - JField("startTime", JString(i.startTime.toString)) :: - JField("endTime", JString(i.endTime.toString)) :: - JField("evaluationClass", JString(i.evaluationClass)) :: - JField("engineParamsGeneratorClass", JString(i.engineParamsGeneratorClass)) :: - JField("batch", JString(i.batch)) :: - JField("env", Extraction.decompose(i.env)(DefaultFormats)) :: - JField("sparkConf", Extraction.decompose(i.sparkConf)(DefaultFormats)) :: - JField("evaluatorResults", JString(i.evaluatorResults)) :: - JField("evaluatorResultsHTML", JString(i.evaluatorResultsHTML)) :: - JField("evaluatorResultsJSON", JString(i.evaluatorResultsJSON)) :: - Nil - ) - } - ) -) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Event.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/Event.scala b/data/src/main/scala/io/prediction/data/storage/Event.scala deleted file mode 100644 index abc16b9..0000000 --- a/data/src/main/scala/io/prediction/data/storage/Event.scala +++ /dev/null @@ -1,164 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi -import org.joda.time.DateTime -import org.joda.time.DateTimeZone - -/** Each event in the Event Store can be represented by fields in this case - * class. - * - * @param eventId Unique ID of this event. - * @param event Name of this event. - * @param entityType Type of the entity associated with this event. - * @param entityId ID of the entity associated with this event. - * @param targetEntityType Type of the target entity associated with this - * event. - * @param targetEntityId ID of the target entity associated with this event. - * @param properties Properties associated with this event. - * @param eventTime Time of the happening of this event. - * @param tags Tags of this event. - * @param prId PredictedResultId of this event. - * @param creationTime Time of creation in the system of this event. - * @group Event Data - */ -case class Event( - val eventId: Option[String] = None, - val event: String, - val entityType: String, - val entityId: String, - val targetEntityType: Option[String] = None, - val targetEntityId: Option[String] = None, - val properties: DataMap = DataMap(), // default empty - val eventTime: DateTime = DateTime.now, - val tags: Seq[String] = Nil, - val prId: Option[String] = None, - val creationTime: DateTime = DateTime.now -) { - override def toString(): String = { - s"Event(id=$eventId,event=$event,eType=$entityType,eId=$entityId," + - s"tType=$targetEntityType,tId=$targetEntityId,p=$properties,t=$eventTime," + - s"tags=$tags,pKey=$prId,ct=$creationTime)" - } -} - -/** :: DeveloperApi :: - * Utilities for validating [[Event]]s - * - * @group Event Data - */ -@DeveloperApi -object EventValidation { - /** Default time zone is set to UTC */ - val defaultTimeZone = DateTimeZone.UTC - - /** Checks whether an event name contains a reserved prefix - * - * @param name Event name - * @return true if event name starts with \$ or pio_, false otherwise - */ - def isReservedPrefix(name: String): Boolean = name.startsWith("$") || - name.startsWith("pio_") - - /** PredictionIO reserves some single entity event names. They are currently - * \$set, \$unset, and \$delete. - */ - val specialEvents = Set("$set", "$unset", "$delete") - - /** Checks whether an event name is a special PredictionIO event name - * - * @param name Event name - * @return true if the name is a special event, false otherwise - */ - def isSpecialEvents(name: String): Boolean = specialEvents.contains(name) - - /** Validate an [[Event]], throwing exceptions when the candidate violates any - * of the following: - * - * - event name must not be empty - * - entityType must not be empty - * - entityId must not be empty - * - targetEntityType must not be Some of empty - * - targetEntityId must not be Some of empty - * - targetEntityType and targetEntityId must be both Some or None - * - properties must not be empty when event is \$unset - * - event name must be a special event if it has a reserved prefix - * - targetEntityType and targetEntityId must be None if the event name has - * a reserved prefix - * - entityType must be a built-in entity type if entityType has a - * reserved prefix - * - targetEntityType must be a built-in entity type if targetEntityType is - * Some and has a reserved prefix - * - * @param e Event to be validated - */ - def validate(e: Event): Unit = { - - require(!e.event.isEmpty, "event must not be empty.") - require(!e.entityType.isEmpty, "entityType must not be empty string.") - require(!e.entityId.isEmpty, "entityId must not be empty string.") - require(e.targetEntityType.map(!_.isEmpty).getOrElse(true), - "targetEntityType must not be empty string") - require(e.targetEntityId.map(!_.isEmpty).getOrElse(true), - "targetEntityId must not be empty string.") - require(!((e.targetEntityType != None) && (e.targetEntityId == None)), - "targetEntityType and targetEntityId must be specified together.") - require(!((e.targetEntityType == None) && (e.targetEntityId != None)), - "targetEntityType and targetEntityId must be specified together.") - require(!((e.event == "$unset") && e.properties.isEmpty), - "properties cannot be empty for $unset event") - require(!isReservedPrefix(e.event) || isSpecialEvents(e.event), - s"${e.event} is not a supported reserved event name.") - require(!isSpecialEvents(e.event) || - ((e.targetEntityType == None) && (e.targetEntityId == None)), - s"Reserved event ${e.event} cannot have targetEntity") - require(!isReservedPrefix(e.entityType) || - isBuiltinEntityTypes(e.entityType), - s"The entityType ${e.entityType} is not allowed. " + - s"'pio_' is a reserved name prefix.") - require(e.targetEntityType.map{ t => - (!isReservedPrefix(t) || isBuiltinEntityTypes(t))}.getOrElse(true), - s"The targetEntityType ${e.targetEntityType.get} is not allowed. " + - s"'pio_' is a reserved name prefix.") - validateProperties(e) - } - - /** Defines built-in entity types. The current built-in type is pio_pr. */ - val builtinEntityTypes: Set[String] = Set("pio_pr") - - /** Defines built-in properties. This is currently empty. */ - val builtinProperties: Set[String] = Set() - - /** Checks whether an entity type is a built-in entity type */ - def isBuiltinEntityTypes(name: String): Boolean = builtinEntityTypes.contains(name) - - /** Validate event properties, throwing exceptions when the candidate violates - * any of the following: - * - * - property name must not contain a reserved prefix - * - * @param e Event to be validated - */ - def validateProperties(e: Event): Unit = { - e.properties.keySet.foreach { k => - require(!isReservedPrefix(k) || builtinProperties.contains(k), - s"The property ${k} is not allowed. " + - s"'pio_' is a reserved name prefix.") - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala b/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala deleted file mode 100644 index 22243c2..0000000 --- a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala +++ /dev/null @@ -1,236 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi -import io.prediction.data.{Utils => DataUtils} -import org.joda.time.DateTime -import org.json4s._ -import scala.util.{Try, Success, Failure} - -/** :: DeveloperApi :: - * Support library for dealing with [[Event]] and JSON4S - * - * @group Event Data - */ -@DeveloperApi -object EventJson4sSupport { - /** This is set to org.json4s.DefaultFormats. Do not use JSON4S to serialize - * or deserialize Joda-Time DateTime because it has some issues with timezone - * (as of version 3.2.10) - */ - implicit val formats = DefaultFormats - - /** :: DeveloperApi :: - * Convert JSON from Event Server to [[Event]] - * - * @return deserialization routine used by [[APISerializer]] - */ - @DeveloperApi - def readJson: PartialFunction[JValue, Event] = { - case JObject(x) => { - val fields = new DataMap(x.toMap) - // use get() if required in json - // use getOpt() if not required in json - try { - val event = fields.get[String]("event") - val entityType = fields.get[String]("entityType") - val entityId = fields.get[String]("entityId") - val targetEntityType = fields.getOpt[String]("targetEntityType") - val targetEntityId = fields.getOpt[String]("targetEntityId") - val properties = fields.getOrElse[Map[String, JValue]]( - "properties", Map()) - // default currentTime expressed as UTC timezone - lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone) - val eventTime = fields.getOpt[String]("eventTime") - .map{ s => - try { - DataUtils.stringToDateTime(s) - } catch { - case _: Exception => - throw new MappingException(s"Fail to extract eventTime ${s}") - } - }.getOrElse(currentTime) - - // disable tags from API for now. - val tags = List() - // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List()) - - val prId = fields.getOpt[String]("prId") - - // don't allow user set creationTime from API for now. - val creationTime = currentTime - // val creationTime = fields.getOpt[String]("creationTime") - // .map{ s => - // try { - // DataUtils.stringToDateTime(s) - // } catch { - // case _: Exception => - // throw new MappingException(s"Fail to extract creationTime ${s}") - // } - // }.getOrElse(currentTime) - - - val newEvent = Event( - event = event, - entityType = entityType, - entityId = entityId, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - properties = DataMap(properties), - eventTime = eventTime, - prId = prId, - creationTime = creationTime - ) - EventValidation.validate(newEvent) - newEvent - } catch { - case e: Exception => throw new MappingException(e.toString, e) - } - } - } - - /** :: DeveloperApi :: - * Convert [[Event]] to JSON for use by the Event Server - * - * @return serialization routine used by [[APISerializer]] - */ - @DeveloperApi - def writeJson: PartialFunction[Any, JValue] = { - case d: Event => { - JObject( - JField("eventId", - d.eventId.map( eid => JString(eid)).getOrElse(JNothing)) :: - JField("event", JString(d.event)) :: - JField("entityType", JString(d.entityType)) :: - JField("entityId", JString(d.entityId)) :: - JField("targetEntityType", - d.targetEntityType.map(JString(_)).getOrElse(JNothing)) :: - JField("targetEntityId", - d.targetEntityId.map(JString(_)).getOrElse(JNothing)) :: - JField("properties", d.properties.toJObject) :: - JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) :: - // disable tags from API for now - // JField("tags", JArray(d.tags.toList.map(JString(_)))) :: - // disable tags from API for now - JField("prId", - d.prId.map(JString(_)).getOrElse(JNothing)) :: - // don't show creationTime for now - JField("creationTime", - JString(DataUtils.dateTimeToString(d.creationTime))) :: - Nil) - } - } - - /** :: DeveloperApi :: - * Convert JSON4S JValue to [[Event]] - * - * @return deserialization routine used by [[DBSerializer]] - */ - @DeveloperApi - def deserializeFromJValue: PartialFunction[JValue, Event] = { - case jv: JValue => { - val event = (jv \ "event").extract[String] - val entityType = (jv \ "entityType").extract[String] - val entityId = (jv \ "entityId").extract[String] - val targetEntityType = (jv \ "targetEntityType").extract[Option[String]] - val targetEntityId = (jv \ "targetEntityId").extract[Option[String]] - val properties = (jv \ "properties").extract[JObject] - val eventTime = DataUtils.stringToDateTime( - (jv \ "eventTime").extract[String]) - val tags = (jv \ "tags").extract[Seq[String]] - val prId = (jv \ "prId").extract[Option[String]] - val creationTime = DataUtils.stringToDateTime( - (jv \ "creationTime").extract[String]) - Event( - event = event, - entityType = entityType, - entityId = entityId, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - properties = DataMap(properties), - eventTime = eventTime, - tags = tags, - prId = prId, - creationTime = creationTime) - } - } - - /** :: DeveloperApi :: - * Convert [[Event]] to JSON4S JValue - * - * @return serialization routine used by [[DBSerializer]] - */ - @DeveloperApi - def serializeToJValue: PartialFunction[Any, JValue] = { - case d: Event => { - JObject( - JField("event", JString(d.event)) :: - JField("entityType", JString(d.entityType)) :: - JField("entityId", JString(d.entityId)) :: - JField("targetEntityType", - d.targetEntityType.map(JString(_)).getOrElse(JNothing)) :: - JField("targetEntityId", - d.targetEntityId.map(JString(_)).getOrElse(JNothing)) :: - JField("properties", d.properties.toJObject) :: - JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) :: - JField("tags", JArray(d.tags.toList.map(JString(_)))) :: - JField("prId", - d.prId.map(JString(_)).getOrElse(JNothing)) :: - JField("creationTime", - JString(DataUtils.dateTimeToString(d.creationTime))) :: - Nil) - } - } - - /** :: DeveloperApi :: - * Custom JSON4S serializer for [[Event]] intended to be used by database - * access, or anywhere that demands serdes of [[Event]] to/from JSON4S JValue - */ - @DeveloperApi - class DBSerializer extends CustomSerializer[Event](format => ( - deserializeFromJValue, serializeToJValue)) - - /** :: DeveloperApi :: - * Custom JSON4S serializer for [[Event]] intended to be used by the Event - * Server, or anywhere that demands serdes of [[Event]] to/from JSON - */ - @DeveloperApi - class APISerializer extends CustomSerializer[Event](format => ( - readJson, writeJson)) -} - - -@DeveloperApi -object BatchEventsJson4sSupport { - implicit val formats = DefaultFormats - - @DeveloperApi - def readJson: PartialFunction[JValue, Seq[Try[Event]]] = { - case JArray(events) => { - events.map { event => - try { - Success(EventJson4sSupport.readJson(event)) - } catch { - case e: Exception => Failure(e) - } - } - } - } - - @DeveloperApi - class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty)) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala b/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala deleted file mode 100644 index f3c4b11..0000000 --- a/data/src/main/scala/io/prediction/data/storage/LEventAggregator.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi -import org.joda.time.DateTime - -/** :: DeveloperApi :: - * Provides aggregation support of [[Event]]s to [[LEvents]]. Engine developers - * should use [[io.prediction.data.store.LEventStore]] instead of using this - * directly. - * - * @group Event Data - */ -@DeveloperApi -object LEventAggregator { - /** :: DeveloperApi :: - * Aggregate all properties grouped by entity type given an iterator of - * [[Event]]s with the latest property values from all [[Event]]s, and their - * first and last updated time - * - * @param events An iterator of [[Event]]s whose properties will be aggregated - * @return A map of entity type to [[PropertyMap]] - */ - @DeveloperApi - def aggregateProperties(events: Iterator[Event]): Map[String, PropertyMap] = { - events.toList - .groupBy(_.entityId) - .mapValues(_.sortBy(_.eventTime.getMillis) - .foldLeft[Prop](Prop())(propAggregator)) - .filter{ case (k, v) => v.dm.isDefined } - .mapValues{ v => - require(v.firstUpdated.isDefined, - "Unexpected Error: firstUpdated cannot be None.") - require(v.lastUpdated.isDefined, - "Unexpected Error: lastUpdated cannot be None.") - - PropertyMap( - fields = v.dm.get.fields, - firstUpdated = v.firstUpdated.get, - lastUpdated = v.lastUpdated.get - ) - } - } - - /** :: DeveloperApi :: - * Aggregate all properties given an iterator of [[Event]]s with the latest - * property values from all [[Event]]s, and their first and last updated time - * - * @param events An iterator of [[Event]]s whose properties will be aggregated - * @return An optional [[PropertyMap]] - */ - @DeveloperApi - def aggregatePropertiesSingle(events: Iterator[Event]) - : Option[PropertyMap] = { - val prop = events.toList - .sortBy(_.eventTime.getMillis) - .foldLeft[Prop](Prop())(propAggregator) - - prop.dm.map{ d => - require(prop.firstUpdated.isDefined, - "Unexpected Error: firstUpdated cannot be None.") - require(prop.lastUpdated.isDefined, - "Unexpected Error: lastUpdated cannot be None.") - - PropertyMap( - fields = d.fields, - firstUpdated = prop.firstUpdated.get, - lastUpdated = prop.lastUpdated.get - ) - } - } - - /** Event names that control aggregation: \$set, \$unset, and \$delete */ - val eventNames = List("$set", "$unset", "$delete") - - private - def dataMapAggregator: ((Option[DataMap], Event) => Option[DataMap]) = { - (p, e) => { - e.event match { - case "$set" => { - if (p == None) { - Some(e.properties) - } else { - p.map(_ ++ e.properties) - } - } - case "$unset" => { - if (p == None) { - None - } else { - p.map(_ -- e.properties.keySet) - } - } - case "$delete" => None - case _ => p // do nothing for others - } - } - } - - private - def propAggregator: ((Prop, Event) => Prop) = { - (p, e) => { - e.event match { - case "$set" | "$unset" | "$delete" => { - Prop( - dm = dataMapAggregator(p.dm, e), - firstUpdated = p.firstUpdated.map { t => - first(t, e.eventTime) - }.orElse(Some(e.eventTime)), - lastUpdated = p.lastUpdated.map { t => - last(t, e.eventTime) - }.orElse(Some(e.eventTime)) - ) - } - case _ => p // do nothing for others - } - } - } - - private - def first(a: DateTime, b: DateTime): DateTime = if (b.isBefore(a)) b else a - - private - def last(a: DateTime, b: DateTime): DateTime = if (b.isAfter(a)) b else a - - private case class Prop( - dm: Option[DataMap] = None, - firstUpdated: Option[DateTime] = None, - lastUpdated: Option[DateTime] = None - ) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/LEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/LEvents.scala b/data/src/main/scala/io/prediction/data/storage/LEvents.scala deleted file mode 100644 index 411f3a4..0000000 --- a/data/src/main/scala/io/prediction/data/storage/LEvents.scala +++ /dev/null @@ -1,489 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.annotation.DeveloperApi -import io.prediction.annotation.Experimental - -import scala.concurrent.Future -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.concurrent.ExecutionContext -import scala.concurrent.TimeoutException - -import org.joda.time.DateTime - -/** :: DeveloperApi :: - * Base trait of a data access object that directly returns [[Event]] without - * going through Spark's parallelization. Engine developers should use - * [[io.prediction.data.store.LEventStore]] instead of using this directly. - * - * @group Event Data - */ -@DeveloperApi -trait LEvents { - /** Default timeout for asynchronous operations that is set to 1 minute */ - val defaultTimeout = Duration(60, "seconds") - - /** :: DeveloperApi :: - * Initialize Event Store for an app ID and optionally a channel ID. - * This routine is to be called when an app is first created. - * - * @param appId App ID - * @param channelId Optional channel ID - * @return true if initialization was successful; false otherwise. - */ - @DeveloperApi - def init(appId: Int, channelId: Option[Int] = None): Boolean - - /** :: DeveloperApi :: - * Remove Event Store for an app ID and optional channel ID. - * - * @param appId App ID - * @param channelId Optional channel ID - * @return true if removal was successful; false otherwise. - */ - @DeveloperApi - def remove(appId: Int, channelId: Option[Int] = None): Boolean - - /** :: DeveloperApi :: - * Close this Event Store interface object, e.g. close connection, release - * resources, etc. - */ - @DeveloperApi - def close(): Unit - - /** :: DeveloperApi :: - * Insert an [[Event]] in a non-blocking fashion. - * - * @param event An [[Event]] to be inserted - * @param appId App ID for the [[Event]] to be inserted to - */ - @DeveloperApi - def futureInsert(event: Event, appId: Int)(implicit ec: ExecutionContext): - Future[String] = futureInsert(event, appId, None) - - /** :: DeveloperApi :: - * Insert an [[Event]] in a non-blocking fashion. - * - * @param event An [[Event]] to be inserted - * @param appId App ID for the [[Event]] to be inserted to - * @param channelId Optional channel ID for the [[Event]] to be inserted to - */ - @DeveloperApi - def futureInsert( - event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] - - /** :: DeveloperApi :: - * Get an [[Event]] in a non-blocking fashion. - * - * @param eventId ID of the [[Event]] - * @param appId ID of the app that contains the [[Event]] - */ - @DeveloperApi - def futureGet(eventId: String, appId: Int)(implicit ec: ExecutionContext): - Future[Option[Event]] = futureGet(eventId, appId, None) - - /** :: DeveloperApi :: - * Get an [[Event]] in a non-blocking fashion. - * - * @param eventId ID of the [[Event]] - * @param appId ID of the app that contains the [[Event]] - * @param channelId Optional channel ID that contains the [[Event]] - */ - @DeveloperApi - def futureGet( - eventId: String, - appId: Int, - channelId: Option[Int] - )(implicit ec: ExecutionContext): Future[Option[Event]] - - /** :: DeveloperApi :: - * Delete an [[Event]] in a non-blocking fashion. - * - * @param eventId ID of the [[Event]] - * @param appId ID of the app that contains the [[Event]] - */ - @DeveloperApi - def futureDelete(eventId: String, appId: Int)(implicit ec: ExecutionContext): - Future[Boolean] = futureDelete(eventId, appId, None) - - /** :: DeveloperApi :: - * Delete an [[Event]] in a non-blocking fashion. - * - * @param eventId ID of the [[Event]] - * @param appId ID of the app that contains the [[Event]] - * @param channelId Optional channel ID that contains the [[Event]] - */ - @DeveloperApi - def futureDelete( - eventId: String, - appId: Int, - channelId: Option[Int] - )(implicit ec: ExecutionContext): Future[Boolean] - - /** :: DeveloperApi :: - * Reads from database and returns a Future of Iterator of [[Event]]s. - * - * @param appId return events of this app ID - * @param channelId return events of this channel ID (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param limit Limit number of events. Get all events if None or Some(-1) - * @param reversed Reverse the order. - * - return oldest events first if None or Some(false) (default) - * - return latest events first if Some(true) - * @param ec ExecutionContext - * @return Future[Iterator[Event]] - */ - @DeveloperApi - def futureFind( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - limit: Option[Int] = None, - reversed: Option[Boolean] = None - )(implicit ec: ExecutionContext): Future[Iterator[Event]] - - /** Aggregate properties of entities based on these special events: - * \$set, \$unset, \$delete events. - * and returns a Future of Map of entityId to properties. - * - * @param appId use events of this app ID - * @param channelId use events of this channel ID (default channel if it's None) - * @param entityType aggregate properties of the entities of this entityType - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param required only keep entities with these required properties defined - * @param ec ExecutionContext - * @return Future[Map[String, PropertyMap]] - */ - private[prediction] def futureAggregateProperties( - appId: Int, - channelId: Option[Int] = None, - entityType: String, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - required: Option[Seq[String]] = None)(implicit ec: ExecutionContext): - Future[Map[String, PropertyMap]] = { - futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = Some(entityType), - eventNames = Some(LEventAggregator.eventNames) - ).map{ eventIt => - val dm = LEventAggregator.aggregateProperties(eventIt) - if (required.isDefined) { - dm.filter { case (k, v) => - required.get.map(v.contains(_)).reduce(_ && _) - } - } else dm - } - } - - /** - * :: Experimental :: - * - * Aggregate properties of the specified entity (entityType + entityId) - * based on these special events: - * \$set, \$unset, \$delete events. - * and returns a Future of Option[PropertyMap] - * - * @param appId use events of this app ID - * @param channelId use events of this channel ID (default channel if it's None) - * @param entityType the entityType - * @param entityId the entityId - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param ec ExecutionContext - * @return Future[Option[PropertyMap]] - */ - @Experimental - private[prediction] def futureAggregatePropertiesOfEntity( - appId: Int, - channelId: Option[Int] = None, - entityType: String, - entityId: String, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None)(implicit ec: ExecutionContext): - Future[Option[PropertyMap]] = { - futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = Some(entityType), - entityId = Some(entityId), - eventNames = Some(LEventAggregator.eventNames) - ).map{ eventIt => - LEventAggregator.aggregatePropertiesSingle(eventIt) - } - } - - // following is blocking - private[prediction] def insert(event: Event, appId: Int, - channelId: Option[Int] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - String = { - Await.result(futureInsert(event, appId, channelId), timeout) - } - - private[prediction] def get(eventId: String, appId: Int, - channelId: Option[Int] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Option[Event] = { - Await.result(futureGet(eventId, appId, channelId), timeout) - } - - private[prediction] def delete(eventId: String, appId: Int, - channelId: Option[Int] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Boolean = { - Await.result(futureDelete(eventId, appId, channelId), timeout) - } - - /** reads from database and returns events iterator. - * - * @param appId return events of this app ID - * @param channelId return events of this channel ID (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param limit Limit number of events. Get all events if None or Some(-1) - * @param reversed Reverse the order (should be used with both - * targetEntityType and targetEntityId specified) - * - return oldest events first if None or Some(false) (default) - * - return latest events first if Some(true) - * @param ec ExecutionContext - * @return Iterator[Event] - */ - private[prediction] def find( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - limit: Option[Int] = None, - reversed: Option[Boolean] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Iterator[Event] = { - Await.result(futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - limit = limit, - reversed = reversed), timeout) - } - - // NOTE: remove in next release - @deprecated("Use find() instead.", "0.9.2") - private[prediction] def findLegacy( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - limit: Option[Int] = None, - reversed: Option[Boolean] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Either[StorageError, Iterator[Event]] = { - try { - // return Either for legacy usage - Right(Await.result(futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - limit = limit, - reversed = reversed), timeout)) - } catch { - case e: TimeoutException => Left(StorageError(s"${e}")) - case e: Exception => Left(StorageError(s"${e}")) - } - } - - /** reads events of the specified entity. - * - * @param appId return events of this app ID - * @param channelId return events of this channel ID (default channel if it's None) - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param limit Limit number of events. Get all events if None or Some(-1) - * @param latest Return latest event first (default true) - * @param ec ExecutionContext - * @return Either[StorageError, Iterator[Event]] - */ - // NOTE: remove this function in next release - @deprecated("Use LEventStore.findByEntity() instead.", "0.9.2") - def findSingleEntity( - appId: Int, - channelId: Option[Int] = None, - entityType: String, - entityId: String, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - limit: Option[Int] = None, - latest: Boolean = true, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Either[StorageError, Iterator[Event]] = { - - findLegacy( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = Some(entityType), - entityId = Some(entityId), - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - limit = limit, - reversed = Some(latest), - timeout = timeout) - - } - - /** Aggregate properties of entities based on these special events: - * \$set, \$unset, \$delete events. - * and returns a Map of entityId to properties. - * - * @param appId use events of this app ID - * @param channelId use events of this channel ID (default channel if it's None) - * @param entityType aggregate properties of the entities of this entityType - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param required only keep entities with these required properties defined - * @param ec ExecutionContext - * @return Map[String, PropertyMap] - */ - private[prediction] def aggregateProperties( - appId: Int, - channelId: Option[Int] = None, - entityType: String, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - required: Option[Seq[String]] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Map[String, PropertyMap] = { - Await.result(futureAggregateProperties( - appId = appId, - channelId = channelId, - entityType = entityType, - startTime = startTime, - untilTime = untilTime, - required = required), timeout) - } - - /** - * :: Experimental :: - * - * Aggregate properties of the specified entity (entityType + entityId) - * based on these special events: - * \$set, \$unset, \$delete events. - * and returns Option[PropertyMap] - * - * @param appId use events of this app ID - * @param channelId use events of this channel ID - * @param entityType the entityType - * @param entityId the entityId - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param ec ExecutionContext - * @return Future[Option[PropertyMap]] - */ - @Experimental - private[prediction] def aggregatePropertiesOfEntity( - appId: Int, - channelId: Option[Int] = None, - entityType: String, - entityId: String, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): - Option[PropertyMap] = { - - Await.result(futureAggregatePropertiesOfEntity( - appId = appId, - channelId = channelId, - entityType = entityType, - entityId = entityId, - startTime = startTime, - untilTime = untilTime), timeout) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Models.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/Models.scala b/data/src/main/scala/io/prediction/data/storage/Models.scala deleted file mode 100644 index 53a76ff..0000000 --- a/data/src/main/scala/io/prediction/data/storage/Models.scala +++ /dev/null @@ -1,80 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import com.google.common.io.BaseEncoding -import io.prediction.annotation.DeveloperApi -import org.json4s._ - -/** :: DeveloperApi :: - * Stores model for each engine instance - * - * @param id ID of the model, which should be the same as engine instance ID - * @param models Trained models of all algorithms - * @group Model Data - */ -@DeveloperApi -case class Model( - id: String, - models: Array[Byte]) - -/** :: DeveloperApi :: - * Base trait for of the [[Model]] data access object - * - * @group Model Data - */ -@DeveloperApi -trait Models { - /** Insert a new [[Model]] */ - def insert(i: Model): Unit - - /** Get a [[Model]] by ID */ - def get(id: String): Option[Model] - - /** Delete a [[Model]] */ - def delete(id: String): Unit -} - -/** :: DeveloperApi :: - * JSON4S serializer for [[Model]] - * - * @group Model Data - */ -@DeveloperApi -class ModelSerializer extends CustomSerializer[Model]( - format => ({ - case JObject(fields) => - implicit val formats = DefaultFormats - val seed = Model( - id = "", - models = Array[Byte]()) - fields.foldLeft(seed) { case (i, field) => - field match { - case JField("id", JString(id)) => i.copy(id = id) - case JField("models", JString(models)) => - i.copy(models = BaseEncoding.base64.decode(models)) - case _ => i - } - } - }, - { - case i: Model => - JObject( - JField("id", JString(i.id)) :: - JField("models", JString(BaseEncoding.base64.encode(i.models))) :: - Nil) - } -)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala b/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala deleted file mode 100644 index 2430df9..0000000 --- a/data/src/main/scala/io/prediction/data/storage/PEventAggregator.scala +++ /dev/null @@ -1,209 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.joda.time.DateTime - -import org.json4s.JValue - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -// each JValue data associated with the time it is set -private[prediction] case class PropTime(val d: JValue, val t: Long) - extends Serializable - -private[prediction] case class SetProp ( - val fields: Map[String, PropTime], - // last set time. Note: fields could be empty with valid set time - val t: Long) extends Serializable { - - def ++ (that: SetProp): SetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, PropTime] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData.t > thatData.t) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - // keep the latest set time - val combinedT = if (this.t > that.t) this.t else that.t - - SetProp( - fields = combinedFields, - t = combinedT - ) - } -} - -private[prediction] case class UnsetProp (fields: Map[String, Long]) - extends Serializable { - def ++ (that: UnsetProp): UnsetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, Long] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData > thatData) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - UnsetProp( - fields = combinedFields - ) - } -} - -private[prediction] case class DeleteEntity (t: Long) extends Serializable { - def ++ (that: DeleteEntity): DeleteEntity = { - if (this.t > that.t) this else that - } -} - -private[prediction] case class EventOp ( - val setProp: Option[SetProp] = None, - val unsetProp: Option[UnsetProp] = None, - val deleteEntity: Option[DeleteEntity] = None, - val firstUpdated: Option[DateTime] = None, - val lastUpdated: Option[DateTime] = None -) extends Serializable { - - def ++ (that: EventOp): EventOp = { - val firstUp = (this.firstUpdated ++ that.firstUpdated).reduceOption{ - (a, b) => if (b.getMillis < a.getMillis) b else a - } - val lastUp = (this.lastUpdated ++ that.lastUpdated).reduceOption { - (a, b) => if (b.getMillis > a.getMillis) b else a - } - - EventOp( - setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), - unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), - deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _), - firstUpdated = firstUp, - lastUpdated = lastUp - ) - } - - def toPropertyMap(): Option[PropertyMap] = { - setProp.flatMap { set => - - val unsetKeys: Set[String] = unsetProp.map( unset => - unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet - ).getOrElse(Set()) - - val combinedFields = deleteEntity.map { delete => - if (delete.t >= set.t) { - None - } else { - val deleteKeys: Set[String] = set.fields - .filter { case (k, PropTime(kv, t)) => - (delete.t >= t) - }.keySet - Some(set.fields -- unsetKeys -- deleteKeys) - } - }.getOrElse{ - Some(set.fields -- unsetKeys) - } - - // Note: mapValues() doesn't return concrete Map and causes - // NotSerializableException issue. Use map(identity) to work around this. - // see https://issues.scala-lang.org/browse/SI-7005 - combinedFields.map{ f => - require(firstUpdated.isDefined, - "Unexpected Error: firstUpdated cannot be None.") - require(lastUpdated.isDefined, - "Unexpected Error: lastUpdated cannot be None.") - PropertyMap( - fields = f.mapValues(_.d).map(identity), - firstUpdated = firstUpdated.get, - lastUpdated = lastUpdated.get - ) - } - } - } - -} - -private[prediction] object EventOp { - // create EventOp from Event object - def apply(e: Event): EventOp = { - val t = e.eventTime.getMillis - e.event match { - case "$set" => { - val fields = e.properties.fields.mapValues(jv => - PropTime(jv, t) - ).map(identity) - - EventOp( - setProp = Some(SetProp(fields = fields, t = t)), - firstUpdated = Some(e.eventTime), - lastUpdated = Some(e.eventTime) - ) - } - case "$unset" => { - val fields = e.properties.fields.mapValues(jv => t).map(identity) - EventOp( - unsetProp = Some(UnsetProp(fields = fields)), - firstUpdated = Some(e.eventTime), - lastUpdated = Some(e.eventTime) - ) - } - case "$delete" => { - EventOp( - deleteEntity = Some(DeleteEntity(t)), - firstUpdated = Some(e.eventTime), - lastUpdated = Some(e.eventTime) - ) - } - case _ => { - EventOp() - } - } - } -} - - -private[prediction] object PEventAggregator { - - val eventNames = List("$set", "$unset", "$delete") - - def aggregateProperties(eventsRDD: RDD[Event]): RDD[(String, PropertyMap)] = { - eventsRDD - .map( e => (e.entityId, EventOp(e) )) - .aggregateByKey[EventOp](EventOp())( - // within same partition - seqOp = { case (u, v) => u ++ v }, - // across partition - combOp = { case (accu, u) => accu ++ u } - ) - .mapValues(_.toPropertyMap) - .filter{ case (k, v) => v.isDefined } - .map{ case (k, v) => (k, v.get) } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/PEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/PEvents.scala b/data/src/main/scala/io/prediction/data/storage/PEvents.scala deleted file mode 100644 index 96a11b8..0000000 --- a/data/src/main/scala/io/prediction/data/storage/PEvents.scala +++ /dev/null @@ -1,182 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import grizzled.slf4j.Logger -import io.prediction.annotation.DeveloperApi -import io.prediction.annotation.Experimental -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.joda.time.DateTime - -import scala.reflect.ClassTag - -/** :: DeveloperApi :: - * Base trait of a data access object that returns [[Event]] related RDD data - * structure. Engine developers should use - * [[io.prediction.data.store.PEventStore]] instead of using this directly. - * - * @group Event Data - */ -@DeveloperApi -trait PEvents extends Serializable { - @transient protected lazy val logger = Logger[this.type] - @deprecated("Use PEventStore.find() instead.", "0.9.2") - def getByAppIdAndTimeAndEntity(appId: Int, - startTime: Option[DateTime], - untilTime: Option[DateTime], - entityType: Option[String], - entityId: Option[String])(sc: SparkContext): RDD[Event] = { - find( - appId = appId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = None - )(sc) - } - - /** :: DeveloperApi :: - * Read from database and return the events. The deprecation here is intended - * to engine developers only. - * - * @param appId return events of this app ID - * @param channelId return events of this channel ID (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param sc Spark context - * @return RDD[Event] - */ - @deprecated("Use PEventStore.find() instead.", "0.9.2") - @DeveloperApi - def find( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] - - /** Aggregate properties of entities based on these special events: - * \$set, \$unset, \$delete events. The deprecation here is intended to - * engine developers only. - * - * @param appId use events of this app ID - * @param channelId use events of this channel ID (default channel if it's None) - * @param entityType aggregate properties of the entities of this entityType - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param required only keep entities with these required properties defined - * @param sc Spark context - * @return RDD[(String, PropertyMap)] RDD of entityId and PropertyMap pair - */ - @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2") - def aggregateProperties( - appId: Int, - channelId: Option[Int] = None, - entityType: String, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - required: Option[Seq[String]] = None) - (sc: SparkContext): RDD[(String, PropertyMap)] = { - val eventRDD = find( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = Some(entityType), - eventNames = Some(PEventAggregator.eventNames))(sc) - - val dmRDD = PEventAggregator.aggregateProperties(eventRDD) - - required map { r => - dmRDD.filter { case (k, v) => - r.map(v.contains(_)).reduce(_ && _) - } - } getOrElse dmRDD - } - - /** :: Experimental :: - * Extract EntityMap[A] from events for the entityType - * NOTE: it is local EntityMap[A] - */ - @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2") - @Experimental - def extractEntityMap[A: ClassTag]( - appId: Int, - entityType: String, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - required: Option[Seq[String]] = None) - (sc: SparkContext)(extract: DataMap => A): EntityMap[A] = { - val idToData: Map[String, A] = aggregateProperties( - appId = appId, - entityType = entityType, - startTime = startTime, - untilTime = untilTime, - required = required - )(sc).map{ case (id, dm) => - try { - (id, extract(dm)) - } catch { - case e: Exception => { - logger.error(s"Failed to get extract entity from DataMap $dm of " + - s"entityId $id.", e) - throw e - } - } - }.collectAsMap.toMap - - new EntityMap(idToData) - } - - /** :: DeveloperApi :: - * Write events to database - * - * @param events RDD of Event - * @param appId the app ID - * @param sc Spark Context - */ - @DeveloperApi - def write(events: RDD[Event], appId: Int)(sc: SparkContext): Unit = - write(events, appId, None)(sc) - - /** :: DeveloperApi :: - * Write events to database - * - * @param events RDD of Event - * @param appId the app ID - * @param channelId channel ID (default channel if it's None) - * @param sc Spark Context - */ - @DeveloperApi - def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala b/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala deleted file mode 100644 index bc55fd3..0000000 --- a/data/src/main/scala/io/prediction/data/storage/PropertyMap.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prediction.data.storage - -import org.joda.time.DateTime - -import org.json4s.JValue -import org.json4s.JObject -import org.json4s.native.JsonMethods.parse - -/** A PropertyMap stores aggregated properties of the entity. - * Internally it is a Map - * whose keys are property names and values are corresponding JSON values - * respectively. Use the get() method to retrieve the value of mandatory - * property or use getOpt() to retrieve the value of the optional property. - * - * @param fields Map of property name to JValue - * @param firstUpdated first updated time of this PropertyMap - * @param lastUpdated last updated time of this PropertyMap - */ -class PropertyMap( - fields: Map[String, JValue], - val firstUpdated: DateTime, - val lastUpdated: DateTime -) extends DataMap(fields) { - - override - def toString: String = s"PropertyMap(${fields}, ${firstUpdated}, ${lastUpdated})" - - override - def hashCode: Int = - 41 * ( - 41 * ( - 41 + fields.hashCode - ) + firstUpdated.hashCode - ) + lastUpdated.hashCode - - override - def equals(other: Any): Boolean = other match { - case that: PropertyMap => { - (that.canEqual(this)) && - (super.equals(that)) && - (this.firstUpdated.equals(that.firstUpdated)) && - (this.lastUpdated.equals(that.lastUpdated)) - } - case that: DataMap => { // for testing purpose - super.equals(that) - } - case _ => false - } - - override - def canEqual(other: Any): Boolean = other.isInstanceOf[PropertyMap] -} - -/** Companion object of the [[PropertyMap]] class. */ -object PropertyMap { - - /** Create an PropertyMap from a Map of String to JValue, - * firstUpdated and lastUpdated time. - * - * @param fields a Map of String to JValue - * @param firstUpdated First updated time - * @param lastUpdated Last updated time - * @return a new PropertyMap - */ - def apply(fields: Map[String, JValue], - firstUpdated: DateTime, lastUpdated: DateTime): PropertyMap = - new PropertyMap(fields, firstUpdated, lastUpdated) - - /** Create an PropertyMap from a JSON String and firstUpdated and lastUpdated - * time. - * @param js JSON String. eg """{ "a": 1, "b": "foo" }""" - * @param firstUpdated First updated time - * @param lastUpdated Last updated time - * @return a new PropertyMap - */ - def apply(js: String, firstUpdated: DateTime, lastUpdated: DateTime) - : PropertyMap = apply( - fields = parse(js).asInstanceOf[JObject].obj.toMap, - firstUpdated = firstUpdated, - lastUpdated = lastUpdated - ) -}
