http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala new file mode 100644 index 0000000..6169a02 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Event.scala @@ -0,0 +1,164 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi +import org.joda.time.DateTime +import org.joda.time.DateTimeZone + +/** Each event in the Event Store can be represented by fields in this case + * class. + * + * @param eventId Unique ID of this event. + * @param event Name of this event. + * @param entityType Type of the entity associated with this event. + * @param entityId ID of the entity associated with this event. + * @param targetEntityType Type of the target entity associated with this + * event. + * @param targetEntityId ID of the target entity associated with this event. + * @param properties Properties associated with this event. + * @param eventTime Time of the happening of this event. + * @param tags Tags of this event. + * @param prId PredictedResultId of this event. + * @param creationTime Time of creation in the system of this event. + * @group Event Data + */ +case class Event( + val eventId: Option[String] = None, + val event: String, + val entityType: String, + val entityId: String, + val targetEntityType: Option[String] = None, + val targetEntityId: Option[String] = None, + val properties: DataMap = DataMap(), // default empty + val eventTime: DateTime = DateTime.now, + val tags: Seq[String] = Nil, + val prId: Option[String] = None, + val creationTime: DateTime = DateTime.now +) { + override def toString(): String = { + s"Event(id=$eventId,event=$event,eType=$entityType,eId=$entityId," + + s"tType=$targetEntityType,tId=$targetEntityId,p=$properties,t=$eventTime," + + s"tags=$tags,pKey=$prId,ct=$creationTime)" + } +} + +/** :: DeveloperApi :: + * Utilities for validating [[Event]]s + * + * @group Event Data + */ +@DeveloperApi +object EventValidation { + /** Default time zone is set to UTC */ + val defaultTimeZone = DateTimeZone.UTC + + /** Checks whether an event name contains a reserved prefix + * + * @param name Event name + * @return true if event name starts with \$ or pio_, false otherwise + */ + def isReservedPrefix(name: String): Boolean = name.startsWith("$") || + name.startsWith("pio_") + + /** PredictionIO reserves some single entity event names. They are currently + * \$set, \$unset, and \$delete. + */ + val specialEvents = Set("$set", "$unset", "$delete") + + /** Checks whether an event name is a special PredictionIO event name + * + * @param name Event name + * @return true if the name is a special event, false otherwise + */ + def isSpecialEvents(name: String): Boolean = specialEvents.contains(name) + + /** Validate an [[Event]], throwing exceptions when the candidate violates any + * of the following: + * + * - event name must not be empty + * - entityType must not be empty + * - entityId must not be empty + * - targetEntityType must not be Some of empty + * - targetEntityId must not be Some of empty + * - targetEntityType and targetEntityId must be both Some or None + * - properties must not be empty when event is \$unset + * - event name must be a special event if it has a reserved prefix + * - targetEntityType and targetEntityId must be None if the event name has + * a reserved prefix + * - entityType must be a built-in entity type if entityType has a + * reserved prefix + * - targetEntityType must be a built-in entity type if targetEntityType is + * Some and has a reserved prefix + * + * @param e Event to be validated + */ + def validate(e: Event): Unit = { + + require(!e.event.isEmpty, "event must not be empty.") + require(!e.entityType.isEmpty, "entityType must not be empty string.") + require(!e.entityId.isEmpty, "entityId must not be empty string.") + require(e.targetEntityType.map(!_.isEmpty).getOrElse(true), + "targetEntityType must not be empty string") + require(e.targetEntityId.map(!_.isEmpty).getOrElse(true), + "targetEntityId must not be empty string.") + require(!((e.targetEntityType != None) && (e.targetEntityId == None)), + "targetEntityType and targetEntityId must be specified together.") + require(!((e.targetEntityType == None) && (e.targetEntityId != None)), + "targetEntityType and targetEntityId must be specified together.") + require(!((e.event == "$unset") && e.properties.isEmpty), + "properties cannot be empty for $unset event") + require(!isReservedPrefix(e.event) || isSpecialEvents(e.event), + s"${e.event} is not a supported reserved event name.") + require(!isSpecialEvents(e.event) || + ((e.targetEntityType == None) && (e.targetEntityId == None)), + s"Reserved event ${e.event} cannot have targetEntity") + require(!isReservedPrefix(e.entityType) || + isBuiltinEntityTypes(e.entityType), + s"The entityType ${e.entityType} is not allowed. " + + s"'pio_' is a reserved name prefix.") + require(e.targetEntityType.map{ t => + (!isReservedPrefix(t) || isBuiltinEntityTypes(t))}.getOrElse(true), + s"The targetEntityType ${e.targetEntityType.get} is not allowed. " + + s"'pio_' is a reserved name prefix.") + validateProperties(e) + } + + /** Defines built-in entity types. The current built-in type is pio_pr. */ + val builtinEntityTypes: Set[String] = Set("pio_pr") + + /** Defines built-in properties. This is currently empty. */ + val builtinProperties: Set[String] = Set() + + /** Checks whether an entity type is a built-in entity type */ + def isBuiltinEntityTypes(name: String): Boolean = builtinEntityTypes.contains(name) + + /** Validate event properties, throwing exceptions when the candidate violates + * any of the following: + * + * - property name must not contain a reserved prefix + * + * @param e Event to be validated + */ + def validateProperties(e: Event): Unit = { + e.properties.keySet.foreach { k => + require(!isReservedPrefix(k) || builtinProperties.contains(k), + s"The property ${k} is not allowed. " + + s"'pio_' is a reserved name prefix.") + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala new file mode 100644 index 0000000..7d4fce3 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala @@ -0,0 +1,236 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.data.{Utils => DataUtils} +import org.joda.time.DateTime +import org.json4s._ +import scala.util.{Try, Success, Failure} + +/** :: DeveloperApi :: + * Support library for dealing with [[Event]] and JSON4S + * + * @group Event Data + */ +@DeveloperApi +object EventJson4sSupport { + /** This is set to org.json4s.DefaultFormats. Do not use JSON4S to serialize + * or deserialize Joda-Time DateTime because it has some issues with timezone + * (as of version 3.2.10) + */ + implicit val formats = DefaultFormats + + /** :: DeveloperApi :: + * Convert JSON from Event Server to [[Event]] + * + * @return deserialization routine used by [[APISerializer]] + */ + @DeveloperApi + def readJson: PartialFunction[JValue, Event] = { + case JObject(x) => { + val fields = new DataMap(x.toMap) + // use get() if required in json + // use getOpt() if not required in json + try { + val event = fields.get[String]("event") + val entityType = fields.get[String]("entityType") + val entityId = fields.get[String]("entityId") + val targetEntityType = fields.getOpt[String]("targetEntityType") + val targetEntityId = fields.getOpt[String]("targetEntityId") + val properties = fields.getOrElse[Map[String, JValue]]( + "properties", Map()) + // default currentTime expressed as UTC timezone + lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone) + val eventTime = fields.getOpt[String]("eventTime") + .map{ s => + try { + DataUtils.stringToDateTime(s) + } catch { + case _: Exception => + throw new MappingException(s"Fail to extract eventTime ${s}") + } + }.getOrElse(currentTime) + + // disable tags from API for now. + val tags = List() + // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List()) + + val prId = fields.getOpt[String]("prId") + + // don't allow user set creationTime from API for now. + val creationTime = currentTime + // val creationTime = fields.getOpt[String]("creationTime") + // .map{ s => + // try { + // DataUtils.stringToDateTime(s) + // } catch { + // case _: Exception => + // throw new MappingException(s"Fail to extract creationTime ${s}") + // } + // }.getOrElse(currentTime) + + + val newEvent = Event( + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = DataMap(properties), + eventTime = eventTime, + prId = prId, + creationTime = creationTime + ) + EventValidation.validate(newEvent) + newEvent + } catch { + case e: Exception => throw new MappingException(e.toString, e) + } + } + } + + /** :: DeveloperApi :: + * Convert [[Event]] to JSON for use by the Event Server + * + * @return serialization routine used by [[APISerializer]] + */ + @DeveloperApi + def writeJson: PartialFunction[Any, JValue] = { + case d: Event => { + JObject( + JField("eventId", + d.eventId.map( eid => JString(eid)).getOrElse(JNothing)) :: + JField("event", JString(d.event)) :: + JField("entityType", JString(d.entityType)) :: + JField("entityId", JString(d.entityId)) :: + JField("targetEntityType", + d.targetEntityType.map(JString(_)).getOrElse(JNothing)) :: + JField("targetEntityId", + d.targetEntityId.map(JString(_)).getOrElse(JNothing)) :: + JField("properties", d.properties.toJObject) :: + JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) :: + // disable tags from API for now + // JField("tags", JArray(d.tags.toList.map(JString(_)))) :: + // disable tags from API for now + JField("prId", + d.prId.map(JString(_)).getOrElse(JNothing)) :: + // don't show creationTime for now + JField("creationTime", + JString(DataUtils.dateTimeToString(d.creationTime))) :: + Nil) + } + } + + /** :: DeveloperApi :: + * Convert JSON4S JValue to [[Event]] + * + * @return deserialization routine used by [[DBSerializer]] + */ + @DeveloperApi + def deserializeFromJValue: PartialFunction[JValue, Event] = { + case jv: JValue => { + val event = (jv \ "event").extract[String] + val entityType = (jv \ "entityType").extract[String] + val entityId = (jv \ "entityId").extract[String] + val targetEntityType = (jv \ "targetEntityType").extract[Option[String]] + val targetEntityId = (jv \ "targetEntityId").extract[Option[String]] + val properties = (jv \ "properties").extract[JObject] + val eventTime = DataUtils.stringToDateTime( + (jv \ "eventTime").extract[String]) + val tags = (jv \ "tags").extract[Seq[String]] + val prId = (jv \ "prId").extract[Option[String]] + val creationTime = DataUtils.stringToDateTime( + (jv \ "creationTime").extract[String]) + Event( + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = DataMap(properties), + eventTime = eventTime, + tags = tags, + prId = prId, + creationTime = creationTime) + } + } + + /** :: DeveloperApi :: + * Convert [[Event]] to JSON4S JValue + * + * @return serialization routine used by [[DBSerializer]] + */ + @DeveloperApi + def serializeToJValue: PartialFunction[Any, JValue] = { + case d: Event => { + JObject( + JField("event", JString(d.event)) :: + JField("entityType", JString(d.entityType)) :: + JField("entityId", JString(d.entityId)) :: + JField("targetEntityType", + d.targetEntityType.map(JString(_)).getOrElse(JNothing)) :: + JField("targetEntityId", + d.targetEntityId.map(JString(_)).getOrElse(JNothing)) :: + JField("properties", d.properties.toJObject) :: + JField("eventTime", JString(DataUtils.dateTimeToString(d.eventTime))) :: + JField("tags", JArray(d.tags.toList.map(JString(_)))) :: + JField("prId", + d.prId.map(JString(_)).getOrElse(JNothing)) :: + JField("creationTime", + JString(DataUtils.dateTimeToString(d.creationTime))) :: + Nil) + } + } + + /** :: DeveloperApi :: + * Custom JSON4S serializer for [[Event]] intended to be used by database + * access, or anywhere that demands serdes of [[Event]] to/from JSON4S JValue + */ + @DeveloperApi + class DBSerializer extends CustomSerializer[Event](format => ( + deserializeFromJValue, serializeToJValue)) + + /** :: DeveloperApi :: + * Custom JSON4S serializer for [[Event]] intended to be used by the Event + * Server, or anywhere that demands serdes of [[Event]] to/from JSON + */ + @DeveloperApi + class APISerializer extends CustomSerializer[Event](format => ( + readJson, writeJson)) +} + + +@DeveloperApi +object BatchEventsJson4sSupport { + implicit val formats = DefaultFormats + + @DeveloperApi + def readJson: PartialFunction[JValue, Seq[Try[Event]]] = { + case JArray(events) => { + events.map { event => + try { + Success(EventJson4sSupport.readJson(event)) + } catch { + case e: Exception => Failure(e) + } + } + } + } + + @DeveloperApi + class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty)) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala new file mode 100644 index 0000000..6836c6d --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEventAggregator.scala @@ -0,0 +1,145 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi +import org.joda.time.DateTime + +/** :: DeveloperApi :: + * Provides aggregation support of [[Event]]s to [[LEvents]]. Engine developers + * should use [[org.apache.predictionio.data.store.LEventStore]] instead of using this + * directly. + * + * @group Event Data + */ +@DeveloperApi +object LEventAggregator { + /** :: DeveloperApi :: + * Aggregate all properties grouped by entity type given an iterator of + * [[Event]]s with the latest property values from all [[Event]]s, and their + * first and last updated time + * + * @param events An iterator of [[Event]]s whose properties will be aggregated + * @return A map of entity type to [[PropertyMap]] + */ + @DeveloperApi + def aggregateProperties(events: Iterator[Event]): Map[String, PropertyMap] = { + events.toList + .groupBy(_.entityId) + .mapValues(_.sortBy(_.eventTime.getMillis) + .foldLeft[Prop](Prop())(propAggregator)) + .filter{ case (k, v) => v.dm.isDefined } + .mapValues{ v => + require(v.firstUpdated.isDefined, + "Unexpected Error: firstUpdated cannot be None.") + require(v.lastUpdated.isDefined, + "Unexpected Error: lastUpdated cannot be None.") + + PropertyMap( + fields = v.dm.get.fields, + firstUpdated = v.firstUpdated.get, + lastUpdated = v.lastUpdated.get + ) + } + } + + /** :: DeveloperApi :: + * Aggregate all properties given an iterator of [[Event]]s with the latest + * property values from all [[Event]]s, and their first and last updated time + * + * @param events An iterator of [[Event]]s whose properties will be aggregated + * @return An optional [[PropertyMap]] + */ + @DeveloperApi + def aggregatePropertiesSingle(events: Iterator[Event]) + : Option[PropertyMap] = { + val prop = events.toList + .sortBy(_.eventTime.getMillis) + .foldLeft[Prop](Prop())(propAggregator) + + prop.dm.map{ d => + require(prop.firstUpdated.isDefined, + "Unexpected Error: firstUpdated cannot be None.") + require(prop.lastUpdated.isDefined, + "Unexpected Error: lastUpdated cannot be None.") + + PropertyMap( + fields = d.fields, + firstUpdated = prop.firstUpdated.get, + lastUpdated = prop.lastUpdated.get + ) + } + } + + /** Event names that control aggregation: \$set, \$unset, and \$delete */ + val eventNames = List("$set", "$unset", "$delete") + + private + def dataMapAggregator: ((Option[DataMap], Event) => Option[DataMap]) = { + (p, e) => { + e.event match { + case "$set" => { + if (p == None) { + Some(e.properties) + } else { + p.map(_ ++ e.properties) + } + } + case "$unset" => { + if (p == None) { + None + } else { + p.map(_ -- e.properties.keySet) + } + } + case "$delete" => None + case _ => p // do nothing for others + } + } + } + + private + def propAggregator: ((Prop, Event) => Prop) = { + (p, e) => { + e.event match { + case "$set" | "$unset" | "$delete" => { + Prop( + dm = dataMapAggregator(p.dm, e), + firstUpdated = p.firstUpdated.map { t => + first(t, e.eventTime) + }.orElse(Some(e.eventTime)), + lastUpdated = p.lastUpdated.map { t => + last(t, e.eventTime) + }.orElse(Some(e.eventTime)) + ) + } + case _ => p // do nothing for others + } + } + } + + private + def first(a: DateTime, b: DateTime): DateTime = if (b.isBefore(a)) b else a + + private + def last(a: DateTime, b: DateTime): DateTime = if (b.isAfter(a)) b else a + + private case class Prop( + dm: Option[DataMap] = None, + firstUpdated: Option[DateTime] = None, + lastUpdated: Option[DateTime] = None + ) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala new file mode 100644 index 0000000..d6e753c --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala @@ -0,0 +1,489 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.annotation.Experimental + +import scala.concurrent.Future +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.concurrent.ExecutionContext +import scala.concurrent.TimeoutException + +import org.joda.time.DateTime + +/** :: DeveloperApi :: + * Base trait of a data access object that directly returns [[Event]] without + * going through Spark's parallelization. Engine developers should use + * [[org.apache.predictionio.data.store.LEventStore]] instead of using this directly. + * + * @group Event Data + */ +@DeveloperApi +trait LEvents { + /** Default timeout for asynchronous operations that is set to 1 minute */ + val defaultTimeout = Duration(60, "seconds") + + /** :: DeveloperApi :: + * Initialize Event Store for an app ID and optionally a channel ID. + * This routine is to be called when an app is first created. + * + * @param appId App ID + * @param channelId Optional channel ID + * @return true if initialization was successful; false otherwise. + */ + @DeveloperApi + def init(appId: Int, channelId: Option[Int] = None): Boolean + + /** :: DeveloperApi :: + * Remove Event Store for an app ID and optional channel ID. + * + * @param appId App ID + * @param channelId Optional channel ID + * @return true if removal was successful; false otherwise. + */ + @DeveloperApi + def remove(appId: Int, channelId: Option[Int] = None): Boolean + + /** :: DeveloperApi :: + * Close this Event Store interface object, e.g. close connection, release + * resources, etc. + */ + @DeveloperApi + def close(): Unit + + /** :: DeveloperApi :: + * Insert an [[Event]] in a non-blocking fashion. + * + * @param event An [[Event]] to be inserted + * @param appId App ID for the [[Event]] to be inserted to + */ + @DeveloperApi + def futureInsert(event: Event, appId: Int)(implicit ec: ExecutionContext): + Future[String] = futureInsert(event, appId, None) + + /** :: DeveloperApi :: + * Insert an [[Event]] in a non-blocking fashion. + * + * @param event An [[Event]] to be inserted + * @param appId App ID for the [[Event]] to be inserted to + * @param channelId Optional channel ID for the [[Event]] to be inserted to + */ + @DeveloperApi + def futureInsert( + event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] + + /** :: DeveloperApi :: + * Get an [[Event]] in a non-blocking fashion. + * + * @param eventId ID of the [[Event]] + * @param appId ID of the app that contains the [[Event]] + */ + @DeveloperApi + def futureGet(eventId: String, appId: Int)(implicit ec: ExecutionContext): + Future[Option[Event]] = futureGet(eventId, appId, None) + + /** :: DeveloperApi :: + * Get an [[Event]] in a non-blocking fashion. + * + * @param eventId ID of the [[Event]] + * @param appId ID of the app that contains the [[Event]] + * @param channelId Optional channel ID that contains the [[Event]] + */ + @DeveloperApi + def futureGet( + eventId: String, + appId: Int, + channelId: Option[Int] + )(implicit ec: ExecutionContext): Future[Option[Event]] + + /** :: DeveloperApi :: + * Delete an [[Event]] in a non-blocking fashion. + * + * @param eventId ID of the [[Event]] + * @param appId ID of the app that contains the [[Event]] + */ + @DeveloperApi + def futureDelete(eventId: String, appId: Int)(implicit ec: ExecutionContext): + Future[Boolean] = futureDelete(eventId, appId, None) + + /** :: DeveloperApi :: + * Delete an [[Event]] in a non-blocking fashion. + * + * @param eventId ID of the [[Event]] + * @param appId ID of the app that contains the [[Event]] + * @param channelId Optional channel ID that contains the [[Event]] + */ + @DeveloperApi + def futureDelete( + eventId: String, + appId: Int, + channelId: Option[Int] + )(implicit ec: ExecutionContext): Future[Boolean] + + /** :: DeveloperApi :: + * Reads from database and returns a Future of Iterator of [[Event]]s. + * + * @param appId return events of this app ID + * @param channelId return events of this channel ID (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param limit Limit number of events. Get all events if None or Some(-1) + * @param reversed Reverse the order. + * - return oldest events first if None or Some(false) (default) + * - return latest events first if Some(true) + * @param ec ExecutionContext + * @return Future[Iterator[Event]] + */ + @DeveloperApi + def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None + )(implicit ec: ExecutionContext): Future[Iterator[Event]] + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. + * and returns a Future of Map of entityId to properties. + * + * @param appId use events of this app ID + * @param channelId use events of this channel ID (default channel if it's None) + * @param entityType aggregate properties of the entities of this entityType + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param ec ExecutionContext + * @return Future[Map[String, PropertyMap]] + */ + private[prediction] def futureAggregateProperties( + appId: Int, + channelId: Option[Int] = None, + entityType: String, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + required: Option[Seq[String]] = None)(implicit ec: ExecutionContext): + Future[Map[String, PropertyMap]] = { + futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = Some(entityType), + eventNames = Some(LEventAggregator.eventNames) + ).map{ eventIt => + val dm = LEventAggregator.aggregateProperties(eventIt) + if (required.isDefined) { + dm.filter { case (k, v) => + required.get.map(v.contains(_)).reduce(_ && _) + } + } else dm + } + } + + /** + * :: Experimental :: + * + * Aggregate properties of the specified entity (entityType + entityId) + * based on these special events: + * \$set, \$unset, \$delete events. + * and returns a Future of Option[PropertyMap] + * + * @param appId use events of this app ID + * @param channelId use events of this channel ID (default channel if it's None) + * @param entityType the entityType + * @param entityId the entityId + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param ec ExecutionContext + * @return Future[Option[PropertyMap]] + */ + @Experimental + private[prediction] def futureAggregatePropertiesOfEntity( + appId: Int, + channelId: Option[Int] = None, + entityType: String, + entityId: String, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None)(implicit ec: ExecutionContext): + Future[Option[PropertyMap]] = { + futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = Some(entityType), + entityId = Some(entityId), + eventNames = Some(LEventAggregator.eventNames) + ).map{ eventIt => + LEventAggregator.aggregatePropertiesSingle(eventIt) + } + } + + // following is blocking + private[prediction] def insert(event: Event, appId: Int, + channelId: Option[Int] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + String = { + Await.result(futureInsert(event, appId, channelId), timeout) + } + + private[prediction] def get(eventId: String, appId: Int, + channelId: Option[Int] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Option[Event] = { + Await.result(futureGet(eventId, appId, channelId), timeout) + } + + private[prediction] def delete(eventId: String, appId: Int, + channelId: Option[Int] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Boolean = { + Await.result(futureDelete(eventId, appId, channelId), timeout) + } + + /** reads from database and returns events iterator. + * + * @param appId return events of this app ID + * @param channelId return events of this channel ID (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param limit Limit number of events. Get all events if None or Some(-1) + * @param reversed Reverse the order (should be used with both + * targetEntityType and targetEntityId specified) + * - return oldest events first if None or Some(false) (default) + * - return latest events first if Some(true) + * @param ec ExecutionContext + * @return Iterator[Event] + */ + private[prediction] def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Iterator[Event] = { + Await.result(futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + limit = limit, + reversed = reversed), timeout) + } + + // NOTE: remove in next release + @deprecated("Use find() instead.", "0.9.2") + private[prediction] def findLegacy( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Either[StorageError, Iterator[Event]] = { + try { + // return Either for legacy usage + Right(Await.result(futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + limit = limit, + reversed = reversed), timeout)) + } catch { + case e: TimeoutException => Left(StorageError(s"${e}")) + case e: Exception => Left(StorageError(s"${e}")) + } + } + + /** reads events of the specified entity. + * + * @param appId return events of this app ID + * @param channelId return events of this channel ID (default channel if it's None) + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param limit Limit number of events. Get all events if None or Some(-1) + * @param latest Return latest event first (default true) + * @param ec ExecutionContext + * @return Either[StorageError, Iterator[Event]] + */ + // NOTE: remove this function in next release + @deprecated("Use LEventStore.findByEntity() instead.", "0.9.2") + def findSingleEntity( + appId: Int, + channelId: Option[Int] = None, + entityType: String, + entityId: String, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + limit: Option[Int] = None, + latest: Boolean = true, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Either[StorageError, Iterator[Event]] = { + + findLegacy( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = Some(entityType), + entityId = Some(entityId), + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + limit = limit, + reversed = Some(latest), + timeout = timeout) + + } + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. + * and returns a Map of entityId to properties. + * + * @param appId use events of this app ID + * @param channelId use events of this channel ID (default channel if it's None) + * @param entityType aggregate properties of the entities of this entityType + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param ec ExecutionContext + * @return Map[String, PropertyMap] + */ + private[prediction] def aggregateProperties( + appId: Int, + channelId: Option[Int] = None, + entityType: String, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + required: Option[Seq[String]] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Map[String, PropertyMap] = { + Await.result(futureAggregateProperties( + appId = appId, + channelId = channelId, + entityType = entityType, + startTime = startTime, + untilTime = untilTime, + required = required), timeout) + } + + /** + * :: Experimental :: + * + * Aggregate properties of the specified entity (entityType + entityId) + * based on these special events: + * \$set, \$unset, \$delete events. + * and returns Option[PropertyMap] + * + * @param appId use events of this app ID + * @param channelId use events of this channel ID + * @param entityType the entityType + * @param entityId the entityId + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param ec ExecutionContext + * @return Future[Option[PropertyMap]] + */ + @Experimental + private[prediction] def aggregatePropertiesOfEntity( + appId: Int, + channelId: Option[Int] = None, + entityType: String, + entityId: String, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + timeout: Duration = defaultTimeout)(implicit ec: ExecutionContext): + Option[PropertyMap] = { + + Await.result(futureAggregatePropertiesOfEntity( + appId = appId, + channelId = channelId, + entityType = entityType, + entityId = entityId, + startTime = startTime, + untilTime = untilTime), timeout) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala new file mode 100644 index 0000000..15d7444 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Models.scala @@ -0,0 +1,80 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import com.google.common.io.BaseEncoding +import org.apache.predictionio.annotation.DeveloperApi +import org.json4s._ + +/** :: DeveloperApi :: + * Stores model for each engine instance + * + * @param id ID of the model, which should be the same as engine instance ID + * @param models Trained models of all algorithms + * @group Model Data + */ +@DeveloperApi +case class Model( + id: String, + models: Array[Byte]) + +/** :: DeveloperApi :: + * Base trait for of the [[Model]] data access object + * + * @group Model Data + */ +@DeveloperApi +trait Models { + /** Insert a new [[Model]] */ + def insert(i: Model): Unit + + /** Get a [[Model]] by ID */ + def get(id: String): Option[Model] + + /** Delete a [[Model]] */ + def delete(id: String): Unit +} + +/** :: DeveloperApi :: + * JSON4S serializer for [[Model]] + * + * @group Model Data + */ +@DeveloperApi +class ModelSerializer extends CustomSerializer[Model]( + format => ({ + case JObject(fields) => + implicit val formats = DefaultFormats + val seed = Model( + id = "", + models = Array[Byte]()) + fields.foldLeft(seed) { case (i, field) => + field match { + case JField("id", JString(id)) => i.copy(id = id) + case JField("models", JString(models)) => + i.copy(models = BaseEncoding.base64.decode(models)) + case _ => i + } + } + }, + { + case i: Model => + JObject( + JField("id", JString(i.id)) :: + JField("models", JString(BaseEncoding.base64.encode(i.models))) :: + Nil) + } +)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala new file mode 100644 index 0000000..72287dd --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala @@ -0,0 +1,209 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.joda.time.DateTime + +import org.json4s.JValue + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +// each JValue data associated with the time it is set +private[prediction] case class PropTime(val d: JValue, val t: Long) + extends Serializable + +private[prediction] case class SetProp ( + val fields: Map[String, PropTime], + // last set time. Note: fields could be empty with valid set time + val t: Long) extends Serializable { + + def ++ (that: SetProp): SetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, PropTime] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData.t > thatData.t) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + // keep the latest set time + val combinedT = if (this.t > that.t) this.t else that.t + + SetProp( + fields = combinedFields, + t = combinedT + ) + } +} + +private[prediction] case class UnsetProp (fields: Map[String, Long]) + extends Serializable { + def ++ (that: UnsetProp): UnsetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, Long] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData > thatData) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + UnsetProp( + fields = combinedFields + ) + } +} + +private[prediction] case class DeleteEntity (t: Long) extends Serializable { + def ++ (that: DeleteEntity): DeleteEntity = { + if (this.t > that.t) this else that + } +} + +private[prediction] case class EventOp ( + val setProp: Option[SetProp] = None, + val unsetProp: Option[UnsetProp] = None, + val deleteEntity: Option[DeleteEntity] = None, + val firstUpdated: Option[DateTime] = None, + val lastUpdated: Option[DateTime] = None +) extends Serializable { + + def ++ (that: EventOp): EventOp = { + val firstUp = (this.firstUpdated ++ that.firstUpdated).reduceOption{ + (a, b) => if (b.getMillis < a.getMillis) b else a + } + val lastUp = (this.lastUpdated ++ that.lastUpdated).reduceOption { + (a, b) => if (b.getMillis > a.getMillis) b else a + } + + EventOp( + setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), + unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), + deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _), + firstUpdated = firstUp, + lastUpdated = lastUp + ) + } + + def toPropertyMap(): Option[PropertyMap] = { + setProp.flatMap { set => + + val unsetKeys: Set[String] = unsetProp.map( unset => + unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet + ).getOrElse(Set()) + + val combinedFields = deleteEntity.map { delete => + if (delete.t >= set.t) { + None + } else { + val deleteKeys: Set[String] = set.fields + .filter { case (k, PropTime(kv, t)) => + (delete.t >= t) + }.keySet + Some(set.fields -- unsetKeys -- deleteKeys) + } + }.getOrElse{ + Some(set.fields -- unsetKeys) + } + + // Note: mapValues() doesn't return concrete Map and causes + // NotSerializableException issue. Use map(identity) to work around this. + // see https://issues.scala-lang.org/browse/SI-7005 + combinedFields.map{ f => + require(firstUpdated.isDefined, + "Unexpected Error: firstUpdated cannot be None.") + require(lastUpdated.isDefined, + "Unexpected Error: lastUpdated cannot be None.") + PropertyMap( + fields = f.mapValues(_.d).map(identity), + firstUpdated = firstUpdated.get, + lastUpdated = lastUpdated.get + ) + } + } + } + +} + +private[prediction] object EventOp { + // create EventOp from Event object + def apply(e: Event): EventOp = { + val t = e.eventTime.getMillis + e.event match { + case "$set" => { + val fields = e.properties.fields.mapValues(jv => + PropTime(jv, t) + ).map(identity) + + EventOp( + setProp = Some(SetProp(fields = fields, t = t)), + firstUpdated = Some(e.eventTime), + lastUpdated = Some(e.eventTime) + ) + } + case "$unset" => { + val fields = e.properties.fields.mapValues(jv => t).map(identity) + EventOp( + unsetProp = Some(UnsetProp(fields = fields)), + firstUpdated = Some(e.eventTime), + lastUpdated = Some(e.eventTime) + ) + } + case "$delete" => { + EventOp( + deleteEntity = Some(DeleteEntity(t)), + firstUpdated = Some(e.eventTime), + lastUpdated = Some(e.eventTime) + ) + } + case _ => { + EventOp() + } + } + } +} + + +private[prediction] object PEventAggregator { + + val eventNames = List("$set", "$unset", "$delete") + + def aggregateProperties(eventsRDD: RDD[Event]): RDD[(String, PropertyMap)] = { + eventsRDD + .map( e => (e.entityId, EventOp(e) )) + .aggregateByKey[EventOp](EventOp())( + // within same partition + seqOp = { case (u, v) => u ++ v }, + // across partition + combOp = { case (accu, u) => accu ++ u } + ) + .mapValues(_.toPropertyMap) + .filter{ case (k, v) => v.isDefined } + .map{ case (k, v) => (k, v.get) } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala new file mode 100644 index 0000000..49e5a5e --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEvents.scala @@ -0,0 +1,182 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import grizzled.slf4j.Logger +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.annotation.Experimental +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.joda.time.DateTime + +import scala.reflect.ClassTag + +/** :: DeveloperApi :: + * Base trait of a data access object that returns [[Event]] related RDD data + * structure. Engine developers should use + * [[org.apache.predictionio.data.store.PEventStore]] instead of using this directly. + * + * @group Event Data + */ +@DeveloperApi +trait PEvents extends Serializable { + @transient protected lazy val logger = Logger[this.type] + @deprecated("Use PEventStore.find() instead.", "0.9.2") + def getByAppIdAndTimeAndEntity(appId: Int, + startTime: Option[DateTime], + untilTime: Option[DateTime], + entityType: Option[String], + entityId: Option[String])(sc: SparkContext): RDD[Event] = { + find( + appId = appId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = None + )(sc) + } + + /** :: DeveloperApi :: + * Read from database and return the events. The deprecation here is intended + * to engine developers only. + * + * @param appId return events of this app ID + * @param channelId return events of this channel ID (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param sc Spark context + * @return RDD[Event] + */ + @deprecated("Use PEventStore.find() instead.", "0.9.2") + @DeveloperApi + def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. The deprecation here is intended to + * engine developers only. + * + * @param appId use events of this app ID + * @param channelId use events of this channel ID (default channel if it's None) + * @param entityType aggregate properties of the entities of this entityType + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param sc Spark context + * @return RDD[(String, PropertyMap)] RDD of entityId and PropertyMap pair + */ + @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2") + def aggregateProperties( + appId: Int, + channelId: Option[Int] = None, + entityType: String, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + required: Option[Seq[String]] = None) + (sc: SparkContext): RDD[(String, PropertyMap)] = { + val eventRDD = find( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = Some(entityType), + eventNames = Some(PEventAggregator.eventNames))(sc) + + val dmRDD = PEventAggregator.aggregateProperties(eventRDD) + + required map { r => + dmRDD.filter { case (k, v) => + r.map(v.contains(_)).reduce(_ && _) + } + } getOrElse dmRDD + } + + /** :: Experimental :: + * Extract EntityMap[A] from events for the entityType + * NOTE: it is local EntityMap[A] + */ + @deprecated("Use PEventStore.aggregateProperties() instead.", "0.9.2") + @Experimental + def extractEntityMap[A: ClassTag]( + appId: Int, + entityType: String, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + required: Option[Seq[String]] = None) + (sc: SparkContext)(extract: DataMap => A): EntityMap[A] = { + val idToData: Map[String, A] = aggregateProperties( + appId = appId, + entityType = entityType, + startTime = startTime, + untilTime = untilTime, + required = required + )(sc).map{ case (id, dm) => + try { + (id, extract(dm)) + } catch { + case e: Exception => { + logger.error(s"Failed to get extract entity from DataMap $dm of " + + s"entityId $id.", e) + throw e + } + } + }.collectAsMap.toMap + + new EntityMap(idToData) + } + + /** :: DeveloperApi :: + * Write events to database + * + * @param events RDD of Event + * @param appId the app ID + * @param sc Spark Context + */ + @DeveloperApi + def write(events: RDD[Event], appId: Int)(sc: SparkContext): Unit = + write(events, appId, None)(sc) + + /** :: DeveloperApi :: + * Write events to database + * + * @param events RDD of Event + * @param appId the app ID + * @param channelId channel ID (default channel if it's None) + * @param sc Spark Context + */ + @DeveloperApi + def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala new file mode 100644 index 0000000..9935558 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/PropertyMap.scala @@ -0,0 +1,96 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.predictionio.data.storage + +import org.joda.time.DateTime + +import org.json4s.JValue +import org.json4s.JObject +import org.json4s.native.JsonMethods.parse + +/** A PropertyMap stores aggregated properties of the entity. + * Internally it is a Map + * whose keys are property names and values are corresponding JSON values + * respectively. Use the get() method to retrieve the value of mandatory + * property or use getOpt() to retrieve the value of the optional property. + * + * @param fields Map of property name to JValue + * @param firstUpdated first updated time of this PropertyMap + * @param lastUpdated last updated time of this PropertyMap + */ +class PropertyMap( + fields: Map[String, JValue], + val firstUpdated: DateTime, + val lastUpdated: DateTime +) extends DataMap(fields) { + + override + def toString: String = s"PropertyMap(${fields}, ${firstUpdated}, ${lastUpdated})" + + override + def hashCode: Int = + 41 * ( + 41 * ( + 41 + fields.hashCode + ) + firstUpdated.hashCode + ) + lastUpdated.hashCode + + override + def equals(other: Any): Boolean = other match { + case that: PropertyMap => { + (that.canEqual(this)) && + (super.equals(that)) && + (this.firstUpdated.equals(that.firstUpdated)) && + (this.lastUpdated.equals(that.lastUpdated)) + } + case that: DataMap => { // for testing purpose + super.equals(that) + } + case _ => false + } + + override + def canEqual(other: Any): Boolean = other.isInstanceOf[PropertyMap] +} + +/** Companion object of the [[PropertyMap]] class. */ +object PropertyMap { + + /** Create an PropertyMap from a Map of String to JValue, + * firstUpdated and lastUpdated time. + * + * @param fields a Map of String to JValue + * @param firstUpdated First updated time + * @param lastUpdated Last updated time + * @return a new PropertyMap + */ + def apply(fields: Map[String, JValue], + firstUpdated: DateTime, lastUpdated: DateTime): PropertyMap = + new PropertyMap(fields, firstUpdated, lastUpdated) + + /** Create an PropertyMap from a JSON String and firstUpdated and lastUpdated + * time. + * @param js JSON String. eg """{ "a": 1, "b": "foo" }""" + * @param firstUpdated First updated time + * @param lastUpdated Last updated time + * @return a new PropertyMap + */ + def apply(js: String, firstUpdated: DateTime, lastUpdated: DateTime) + : PropertyMap = apply( + fields = parse(js).asInstanceOf[JObject].obj.toMap, + firstUpdated = firstUpdated, + lastUpdated = lastUpdated + ) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala new file mode 100644 index 0000000..1f170be --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala @@ -0,0 +1,403 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import java.lang.reflect.InvocationTargetException + +import grizzled.slf4j.Logging +import org.apache.predictionio.annotation.DeveloperApi + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.language.existentials +import scala.reflect.runtime.universe._ + +/** :: DeveloperApi :: + * Any storage backend drivers will need to implement this trait with exactly + * '''StorageClient''' as the class name. PredictionIO storage layer will look + * for this class when it instantiates the actual backend for use by higher + * level storage access APIs. + * + * @group Storage System + */ +@DeveloperApi +trait BaseStorageClient { + /** Configuration of the '''StorageClient''' */ + val config: StorageClientConfig + + /** The actual client object. This could be a database connection or any kind + * of database access object. + */ + val client: AnyRef + + /** Set a prefix for storage class discovery. As an example, if this prefix + * is set as ''JDBC'', when the storage layer instantiates an implementation + * of [[Apps]], it will try to look for a class named ''JDBCApps''. + */ + val prefix: String = "" +} + +/** :: DeveloperApi :: + * A wrapper of storage client configuration that will be populated by + * PredictionIO automatically, and passed to the StorageClient during + * instantiation. + * + * @param parallel This is set to true by PredictionIO when the storage client + * is instantiated in a parallel data source. + * @param test This is set to true by PredictionIO when tests are being run. + * @param properties This is populated by PredictionIO automatically from + * environmental configuration variables. If you have these + * variables, + * - PIO_STORAGE_SOURCES_PGSQL_TYPE=jdbc + * - PIO_STORAGE_SOURCES_PGSQL_USERNAME=abc + * - PIO_STOARGE_SOURCES_PGSQL_PASSWORD=xyz + * + * this field will be filled as a map of string to string: + * - TYPE -> jdbc + * - USERNAME -> abc + * - PASSWORD -> xyz + * + * @group Storage System + */ +@DeveloperApi +case class StorageClientConfig( + parallel: Boolean = false, // parallelized access (RDD)? + test: Boolean = false, // test mode config + properties: Map[String, String] = Map()) + +/** :: DeveloperApi :: + * Thrown when a StorageClient runs into an exceptional condition + * + * @param message Exception error message + * @param cause The underlying exception that caused the exception + * @group Storage System + */ +@DeveloperApi +class StorageClientException(message: String, cause: Throwable) + extends RuntimeException(message, cause) + +@deprecated("Use StorageException", "0.9.2") +private[prediction] case class StorageError(message: String) + +/** :: DeveloperApi :: + * Thrown by data access objects when they run into exceptional conditions + * + * @param message Exception error message + * @param cause The underlying exception that caused the exception + * + * @group Storage System + */ +@DeveloperApi +class StorageException(message: String, cause: Throwable) + extends Exception(message, cause) { + + def this(message: String) = this(message, null) +} + +/** Backend-agnostic data storage layer with lazy initialization. Use this + * object when you need to interface with Event Store in your engine. + * + * @group Storage System + */ +object Storage extends Logging { + private case class ClientMeta( + sourceType: String, + client: BaseStorageClient, + config: StorageClientConfig) + + private case class DataObjectMeta(sourceName: String, namespace: String) + + private var errors = 0 + + private val sourcesPrefix = "PIO_STORAGE_SOURCES" + + private val sourceTypesRegex = """PIO_STORAGE_SOURCES_([^_]+)_TYPE""".r + + private val sourceKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k => + sourceTypesRegex findFirstIn k match { + case Some(sourceTypesRegex(sourceType)) => Seq(sourceType) + case None => Nil + } + } + + if (sourceKeys.size == 0) warn("There is no properly configured data source.") + + private val s2cm = scala.collection.mutable.Map[String, Option[ClientMeta]]() + + /** Reference to the app data repository. */ + private val EventDataRepository = "EVENTDATA" + private val ModelDataRepository = "MODELDATA" + private val MetaDataRepository = "METADATA" + + private val repositoriesPrefix = "PIO_STORAGE_REPOSITORIES" + + private val repositoryNamesRegex = + """PIO_STORAGE_REPOSITORIES_([^_]+)_NAME""".r + + private val repositoryKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k => + repositoryNamesRegex findFirstIn k match { + case Some(repositoryNamesRegex(repositoryName)) => Seq(repositoryName) + case None => Nil + } + } + + if (repositoryKeys.size == 0) { + warn("There is no properly configured repository.") + } + + private val requiredRepositories = Seq(MetaDataRepository) + + requiredRepositories foreach { r => + if (!repositoryKeys.contains(r)) { + error(s"Required repository (${r}) configuration is missing.") + errors += 1 + } + } + private val repositoriesToDataObjectMeta: Map[String, DataObjectMeta] = + repositoryKeys.map(r => + try { + val keyedPath = repositoriesPrefixPath(r) + val name = sys.env(prefixPath(keyedPath, "NAME")) + val sourceName = sys.env(prefixPath(keyedPath, "SOURCE")) + if (sourceKeys.contains(sourceName)) { + r -> DataObjectMeta( + sourceName = sourceName, + namespace = name) + } else { + error(s"$sourceName is not a configured storage source.") + r -> DataObjectMeta("", "") + } + } catch { + case e: Throwable => + error(e.getMessage) + errors += 1 + r -> DataObjectMeta("", "") + } + ).toMap + + if (errors > 0) { + error(s"There were $errors configuration errors. Exiting.") + sys.exit(errors) + } + + // End of constructor and field definitions and begin method definitions + + private def prefixPath(prefix: String, body: String) = s"${prefix}_$body" + + private def sourcesPrefixPath(body: String) = prefixPath(sourcesPrefix, body) + + private def repositoriesPrefixPath(body: String) = + prefixPath(repositoriesPrefix, body) + + private def sourcesToClientMeta( + source: String, + parallel: Boolean, + test: Boolean): Option[ClientMeta] = { + val sourceName = if (parallel) s"parallel-$source" else source + s2cm.getOrElseUpdate(sourceName, updateS2CM(source, parallel, test)) + } + + private def getClient( + clientConfig: StorageClientConfig, + pkg: String): BaseStorageClient = { + val className = "org.apache.predictionio.data.storage." + pkg + ".StorageClient" + try { + Class.forName(className).getConstructors()(0).newInstance(clientConfig). + asInstanceOf[BaseStorageClient] + } catch { + case e: ClassNotFoundException => + val originalClassName = pkg + ".StorageClient" + Class.forName(originalClassName).getConstructors()(0). + newInstance(clientConfig).asInstanceOf[BaseStorageClient] + case e: java.lang.reflect.InvocationTargetException => + throw e.getCause + } + } + + /** Get the StorageClient config data from PIO Framework's environment variables */ + def getConfig(sourceName: String): Option[StorageClientConfig] = { + if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty + && s2cm.get(sourceName).get.nonEmpty) { + Some(s2cm.get(sourceName).get.get.config) + } else None + } + + private def updateS2CM(k: String, parallel: Boolean, test: Boolean): + Option[ClientMeta] = { + try { + val keyedPath = sourcesPrefixPath(k) + val sourceType = sys.env(prefixPath(keyedPath, "TYPE")) + val props = sys.env.filter(t => t._1.startsWith(keyedPath)).map( + t => t._1.replace(s"${keyedPath}_", "") -> t._2) + val clientConfig = StorageClientConfig( + properties = props, + parallel = parallel, + test = test) + val client = getClient(clientConfig, sourceType) + Some(ClientMeta(sourceType, client, clientConfig)) + } catch { + case e: Throwable => + error(s"Error initializing storage client for source ${k}", e) + errors += 1 + None + } + } + + private[prediction] + def getDataObjectFromRepo[T](repo: String, test: Boolean = false) + (implicit tag: TypeTag[T]): T = { + val repoDOMeta = repositoriesToDataObjectMeta(repo) + val repoDOSourceName = repoDOMeta.sourceName + getDataObject[T](repoDOSourceName, repoDOMeta.namespace, test = test) + } + + private[prediction] + def getPDataObject[T](repo: String)(implicit tag: TypeTag[T]): T = { + val repoDOMeta = repositoriesToDataObjectMeta(repo) + val repoDOSourceName = repoDOMeta.sourceName + getPDataObject[T](repoDOSourceName, repoDOMeta.namespace) + } + + private[prediction] def getDataObject[T]( + sourceName: String, + namespace: String, + parallel: Boolean = false, + test: Boolean = false)(implicit tag: TypeTag[T]): T = { + val clientMeta = sourcesToClientMeta(sourceName, parallel, test) getOrElse { + throw new StorageClientException( + s"Data source $sourceName was not properly initialized.", null) + } + val sourceType = clientMeta.sourceType + val ctorArgs = dataObjectCtorArgs(clientMeta.client, namespace) + val classPrefix = clientMeta.client.prefix + val originalClassName = tag.tpe.toString.split('.') + val rawClassName = sourceType + "." + classPrefix + originalClassName.last + val className = "org.apache.predictionio.data.storage." + rawClassName + val clazz = try { + Class.forName(className) + } catch { + case e: ClassNotFoundException => + try { + Class.forName(rawClassName) + } catch { + case e: ClassNotFoundException => + throw new StorageClientException("No storage backend " + + "implementation can be found (tried both " + + s"$className and $rawClassName)", e) + } + } + val constructor = clazz.getConstructors()(0) + try { + constructor.newInstance(ctorArgs: _*). + asInstanceOf[T] + } catch { + case e: IllegalArgumentException => + error( + "Unable to instantiate data object with class '" + + constructor.getDeclaringClass.getName + " because its constructor" + + " does not have the right number of arguments." + + " Number of required constructor arguments: " + + ctorArgs.size + "." + + " Number of existing constructor arguments: " + + constructor.getParameterTypes.size + "." + + s" Storage source name: ${sourceName}." + + s" Exception message: ${e.getMessage}).", e) + errors += 1 + throw e + case e: java.lang.reflect.InvocationTargetException => + throw e.getCause + } + } + + private def getPDataObject[T]( + sourceName: String, + databaseName: String)(implicit tag: TypeTag[T]): T = + getDataObject[T](sourceName, databaseName, true) + + private def dataObjectCtorArgs( + client: BaseStorageClient, + namespace: String): Seq[AnyRef] = { + Seq(client.client, client.config, namespace) + } + + private[prediction] def verifyAllDataObjects(): Unit = { + info("Verifying Meta Data Backend (Source: " + + s"${repositoriesToDataObjectMeta(MetaDataRepository).sourceName})...") + getMetaDataEngineManifests() + getMetaDataEngineInstances() + getMetaDataEvaluationInstances() + getMetaDataApps() + getMetaDataAccessKeys() + info("Verifying Model Data Backend (Source: " + + s"${repositoriesToDataObjectMeta(ModelDataRepository).sourceName})...") + getModelDataModels() + info("Verifying Event Data Backend (Source: " + + s"${repositoriesToDataObjectMeta(EventDataRepository).sourceName})...") + val eventsDb = getLEvents(test = true) + info("Test writing to Event Store (App Id 0)...") + // use appId=0 for testing purpose + eventsDb.init(0) + eventsDb.insert(Event( + event = "test", + entityType = "test", + entityId = "test"), 0) + eventsDb.remove(0) + eventsDb.close() + } + + private[prediction] def getMetaDataEngineManifests(): EngineManifests = + getDataObjectFromRepo[EngineManifests](MetaDataRepository) + + private[prediction] def getMetaDataEngineInstances(): EngineInstances = + getDataObjectFromRepo[EngineInstances](MetaDataRepository) + + private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances = + getDataObjectFromRepo[EvaluationInstances](MetaDataRepository) + + private[prediction] def getMetaDataApps(): Apps = + getDataObjectFromRepo[Apps](MetaDataRepository) + + private[prediction] def getMetaDataAccessKeys(): AccessKeys = + getDataObjectFromRepo[AccessKeys](MetaDataRepository) + + private[prediction] def getMetaDataChannels(): Channels = + getDataObjectFromRepo[Channels](MetaDataRepository) + + private[prediction] def getModelDataModels(): Models = + getDataObjectFromRepo[Models](ModelDataRepository) + + /** Obtains a data access object that returns [[Event]] related local data + * structure. + */ + def getLEvents(test: Boolean = false): LEvents = + getDataObjectFromRepo[LEvents](EventDataRepository, test = test) + + /** Obtains a data access object that returns [[Event]] related RDD data + * structure. + */ + def getPEvents(): PEvents = + getPDataObject[PEvents](EventDataRepository) + + def config: Map[String, Map[String, Map[String, String]]] = Map( + "sources" -> s2cm.toMap.map { case (source, clientMeta) => + source -> clientMeta.map { cm => + Map( + "type" -> cm.sourceType, + "config" -> cm.config.properties.map(t => s"${t._1} -> ${t._2}").mkString(", ") + ) + }.getOrElse(Map.empty) + } + ) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala new file mode 100644 index 0000000..321b245 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Utils.scala @@ -0,0 +1,47 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.joda.time.DateTime +import org.joda.time.format.ISODateTimeFormat + +/** Backend-agnostic storage utilities. */ +private[prediction] object Utils { + /** + * Add prefix to custom attribute keys. + */ + def addPrefixToAttributeKeys[T]( + attributes: Map[String, T], + prefix: String = "ca_"): Map[String, T] = { + attributes map { case (k, v) => (prefix + k, v) } + } + + /** Remove prefix from custom attribute keys. */ + def removePrefixFromAttributeKeys[T]( + attributes: Map[String, T], + prefix: String = "ca_"): Map[String, T] = { + attributes map { case (k, v) => (k.stripPrefix(prefix), v) } + } + + /** + * Appends App ID to any ID. + * Used for distinguishing different app's data within a single collection. + */ + def idWithAppid(appid: Int, id: String): String = appid + "_" + id + + def stringToDateTime(dt: String): DateTime = + ISODateTimeFormat.dateTimeParser.parseDateTime(dt) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala new file mode 100644 index 0000000..7853d97 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -0,0 +1,116 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.AccessKey +import org.apache.predictionio.data.storage.AccessKeys +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +import scala.util.Random + +/** Elasticsearch implementation of AccessKeys. */ +class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) + extends AccessKeys with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "accesskeys" + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(accessKey: AccessKey): Option[String] = { + val key = if (accessKey.key.isEmpty) generateKey else accessKey.key + update(accessKey.copy(key = key)) + Some(key) + } + + def get(key: String): Option[AccessKey] = { + try { + val response = client.prepareGet( + index, + estype, + key).get() + Some(read[AccessKey](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getAll(): Seq[AccessKey] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[AccessKey](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() + } + } + + def getByAppid(appid: Int): Seq[AccessKey] = { + try { + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[AccessKey](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() + } + } + + def update(accessKey: AccessKey): Unit = { + try { + client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + + def delete(key: String): Unit = { + try { + client.prepareDelete(index, estype, key).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala new file mode 100644 index 0000000..6790b52 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -0,0 +1,127 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.json4s.JsonDSL._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + +/** Elasticsearch implementation of Items. */ +class ESApps(client: Client, config: StorageClientConfig, index: String) + extends Apps with Logging { + implicit val formats = DefaultFormats.lossless + private val estype = "apps" + private val seq = new ESSequences(client, config, index) + + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = + (estype -> + ("properties" -> + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get + } + + def insert(app: App): Option[Int] = { + val id = + if (app.id == 0) { + var roll = seq.genNext("apps") + while (!get(roll).isEmpty) roll = seq.genNext("apps") + roll + } + else app.id + val realapp = app.copy(id = id) + update(realapp) + Some(id) + } + + def get(id: Int): Option[App] = { + try { + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[App](response.getSourceAsString)) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + case e: NullPointerException => None + } + } + + def getByName(name: String): Option[App] = { + try { + val response = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("name", name)).get + val hits = response.getHits().hits() + if (hits.size > 0) { + Some(read[App](hits.head.getSourceAsString)) + } else { + None + } + } catch { + case e: ElasticsearchException => + error(e.getMessage) + None + } + } + + def getAll(): Seq[App] = { + try { + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[App](client, builder) + } catch { + case e: ElasticsearchException => + error(e.getMessage) + Seq[App]() + } + } + + def update(app: App): Unit = { + try { + val response = client.prepareIndex(index, estype, app.id.toString). + setSource(write(app)).get() + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } + + def delete(id: Int): Unit = { + try { + client.prepareDelete(index, estype, id.toString).get + } catch { + case e: ElasticsearchException => + error(e.getMessage) + } + } +}
