http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala new file mode 100644 index 0000000..5bd7478 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala @@ -0,0 +1,200 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.view + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Storage + +import org.joda.time.DateTime +import scala.language.implicitConversions + +import scala.concurrent.ExecutionContext.Implicits.global // TODO + +@deprecated("Use LEvents or LEventStore instead.", "0.9.2") +object ViewPredicates { + def getStartTimePredicate(startTimeOpt: Option[DateTime]) + : (Event => Boolean) = { + startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true) + } + + def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = { + e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime))) + } + + def getUntilTimePredicate(untilTimeOpt: Option[DateTime]) + : (Event => Boolean) = { + untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true) + } + + def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = { + _.eventTime.isBefore(untilTime) + } + + def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean) + = { + entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true) + } + + def getEntityTypePredicate(entityType: String): (Event => Boolean) = { + (_.entityType == entityType) + } + + def getEventPredicate(eventOpt: Option[String]): (Event => Boolean) + = { + eventOpt.map(getEventPredicate).getOrElse(_ => true) + } + + def getEventPredicate(event: String): (Event => Boolean) = { + (_.event == event) + } +} + +@deprecated("Use LEvents instead.", "0.9.2") +object ViewAggregators { + def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = { + (p, e) => { + e.event match { + case "$set" => { + if (p == None) { + Some(e.properties) + } else { + p.map(_ ++ e.properties) + } + } + case "$unset" => { + if (p == None) { + None + } else { + p.map(_ -- e.properties.keySet) + } + } + case "$delete" => None + case _ => p // do nothing for others + } + } + } +} + +@deprecated("Use LEvents instead.", "0.9.2") +object EventSeq { + // Need to + // >>> import scala.language.implicitConversions + // to enable implicit conversion. Only import in the code where this is + // necessary to avoid confusion. + implicit def eventSeqToList(es: EventSeq): List[Event] = es.events + implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l) +} + + +@deprecated("Use LEvents instead.", "0.9.2") +class EventSeq(val events: List[Event]) { + def filter( + eventOpt: Option[String] = None, + entityTypeOpt: Option[String] = None, + startTimeOpt: Option[DateTime] = None, + untilTimeOpt: Option[DateTime] = None): EventSeq = { + + events + .filter(ViewPredicates.getEventPredicate(eventOpt)) + .filter(ViewPredicates.getStartTimePredicate(startTimeOpt)) + .filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt)) + .filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt)) + } + + def filter(p: (Event => Boolean)): EventSeq = events.filter(p) + + def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T) + : Map[String, T] = { + events + .groupBy( _.entityId ) + .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op)) + .toMap + } + + +} + + +@deprecated("Use LEventStore instead.", "0.9.2") +class LBatchView( + val appId: Int, + val startTime: Option[DateTime], + val untilTime: Option[DateTime]) { + + @transient lazy val eventsDb = Storage.getLEvents() + + @transient lazy val _events = eventsDb.find( + appId = appId, + startTime = startTime, + untilTime = untilTime).toList + + @transient lazy val events: EventSeq = new EventSeq(_events) + + /* Aggregate event data + * + * @param entityType only aggregate event with entityType + * @param startTimeOpt if specified, only aggregate event after (inclusive) + * startTimeOpt + * @param untilTimeOpt if specified, only aggregate event until (exclusive) + * endTimeOpt + */ + def aggregateProperties( + entityType: String, + startTimeOpt: Option[DateTime] = None, + untilTimeOpt: Option[DateTime] = None + ): Map[String, DataMap] = { + + events + .filter(entityTypeOpt = Some(entityType)) + .filter(e => EventValidation.isSpecialEvents(e.event)) + .aggregateByEntityOrdered( + init = None, + op = ViewAggregators.getDataMapAggregator()) + .filter{ case (k, v) => (v != None) } + .mapValues(_.get) + + } + + /* + def aggregateByEntityOrdered[T]( + predicate: Event => Boolean, + init: T, + op: (T, Event) => T): Map[String, T] = { + + _events + .filter( predicate(_) ) + .groupBy( _.entityId ) + .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op)) + .toMap + + } + */ + + /* + def groupByEntityOrdered[T]( + predicate: Event => Boolean, + map: Event => T): Map[String, Seq[T]] = { + + _events + .filter( predicate(_) ) + .groupBy( _.entityId ) + .mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) ) + .toMap + } + */ +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala new file mode 100644 index 0000000..6c75402 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala @@ -0,0 +1,209 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.view + +import org.apache.predictionio.data.storage.hbase.HBPEvents +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Storage + +import org.joda.time.DateTime + +import org.json4s.JValue + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + + +// each JValue data associated with the time it is set +private[prediction] case class PropTime(val d: JValue, val t: Long) extends Serializable + +private[prediction] case class SetProp ( + val fields: Map[String, PropTime], + // last set time. Note: fields could be empty with valid set time + val t: Long) extends Serializable { + + def ++ (that: SetProp): SetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, PropTime] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData.t > thatData.t) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + // keep the latest set time + val combinedT = if (this.t > that.t) this.t else that.t + + SetProp( + fields = combinedFields, + t = combinedT + ) + } +} + +private[prediction] case class UnsetProp (fields: Map[String, Long]) extends Serializable { + def ++ (that: UnsetProp): UnsetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, Long] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData > thatData) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + UnsetProp( + fields = combinedFields + ) + } +} + +private[prediction] case class DeleteEntity (t: Long) extends Serializable { + def ++ (that: DeleteEntity): DeleteEntity = { + if (this.t > that.t) this else that + } +} + +private[prediction] case class EventOp ( + val setProp: Option[SetProp] = None, + val unsetProp: Option[UnsetProp] = None, + val deleteEntity: Option[DeleteEntity] = None +) extends Serializable { + + def ++ (that: EventOp): EventOp = { + EventOp( + setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), + unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), + deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _) + ) + } + + def toDataMap(): Option[DataMap] = { + setProp.flatMap { set => + + val unsetKeys: Set[String] = unsetProp.map( unset => + unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet + ).getOrElse(Set()) + + val combinedFields = deleteEntity.map { delete => + if (delete.t >= set.t) { + None + } else { + val deleteKeys: Set[String] = set.fields + .filter { case (k, PropTime(kv, t)) => + (delete.t >= t) + }.keySet + Some(set.fields -- unsetKeys -- deleteKeys) + } + }.getOrElse{ + Some(set.fields -- unsetKeys) + } + + // Note: mapValues() doesn't return concrete Map and causes + // NotSerializableException issue. Use map(identity) to work around this. + // see https://issues.scala-lang.org/browse/SI-7005 + combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity))) + } + } + +} + +private[prediction] object EventOp { + def apply(e: Event): EventOp = { + val t = e.eventTime.getMillis + e.event match { + case "$set" => { + val fields = e.properties.fields.mapValues(jv => + PropTime(jv, t) + ).map(identity) + + EventOp( + setProp = Some(SetProp(fields = fields, t = t)) + ) + } + case "$unset" => { + val fields = e.properties.fields.mapValues(jv => t).map(identity) + EventOp( + unsetProp = Some(UnsetProp(fields = fields)) + ) + } + case "$delete" => { + EventOp( + deleteEntity = Some(DeleteEntity(t)) + ) + } + case _ => { + EventOp() + } + } + } +} + +@deprecated("Use PEvents or PEventStore instead.", "0.9.2") +class PBatchView( + val appId: Int, + val startTime: Option[DateTime], + val untilTime: Option[DateTime], + val sc: SparkContext) { + + // NOTE: parallel Events DB interface + @transient lazy val eventsDb = Storage.getPEvents() + + @transient lazy val _events: RDD[Event] = + eventsDb.getByAppIdAndTimeAndEntity( + appId = appId, + startTime = startTime, + untilTime = untilTime, + entityType = None, + entityId = None)(sc) + + // TODO: change to use EventSeq? + @transient lazy val events: RDD[Event] = _events + + def aggregateProperties( + entityType: String, + startTimeOpt: Option[DateTime] = None, + untilTimeOpt: Option[DateTime] = None + ): RDD[(String, DataMap)] = { + + _events + .filter( e => ((e.entityType == entityType) && + (EventValidation.isSpecialEvents(e.event))) ) + .map( e => (e.entityId, EventOp(e) )) + .aggregateByKey[EventOp](EventOp())( + // within same partition + seqOp = { case (u, v) => u ++ v }, + // across partition + combOp = { case (accu, u) => accu ++ u } + ) + .mapValues(_.toDataMap) + .filter{ case (k, v) => v.isDefined } + .map{ case (k, v) => (k, v.get) } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala new file mode 100644 index 0000000..eba3276 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala @@ -0,0 +1,94 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.view + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Storage + +import scala.concurrent.ExecutionContext.Implicits.global // TODO + +import grizzled.slf4j.Logger +import org.joda.time.DateTime + +import scala.language.implicitConversions + +class TestHBLEvents() { + @transient lazy val eventsDb = Storage.getLEvents() + + def run(): Unit = { + val r = eventsDb.find( + appId = 1, + startTime = None, + untilTime = None, + entityType = Some("pio_user"), + entityId = Some("3")).toList + println(r) + } +} + +class TestSource(val appId: Int) { + @transient lazy val logger = Logger[this.type] + @transient lazy val batchView = new LBatchView(appId, + None, None) + + def run(): Unit = { + println(batchView.events) + } +} + +object QuickTest { + + def main(args: Array[String]) { + val t = new TestHBLEvents() + t.run() + + // val ts = new TestSource(args(0).toInt) + // ts.run() + } +} + +object TestEventTime { + @transient lazy val batchView = new LBatchView(9, None, None) + + // implicit def back2list(es: EventSeq) = es.events + + def main(args: Array[String]) { + val e = batchView.events.filter( + eventOpt = Some("rate"), + startTimeOpt = Some(new DateTime(1998, 1, 1, 0, 0)) + // untilTimeOpt = Some(new DateTime(1997, 1, 1, 0, 0)) + ) + // untilTimeOpt = Some(new DateTime(2000, 1, 1, 0, 0))) + + e.foreach { println } + println() + println() + println() + val u = batchView.aggregateProperties("pio_item") + u.foreach { println } + println() + println() + println() + + // val l: Seq[Event] = e + val l = e.map { _.entityId } + l.foreach { println } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala new file mode 100644 index 0000000..ee47a9c --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala @@ -0,0 +1,31 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks + +/** Webhooks Connnector Exception + * + * @param message the detail message + * @param cause the cause + */ +private[prediction] class ConnectorException(message: String, cause: Throwable) + extends Exception(message, cause) { + + /** Webhooks Connnector Exception with cause being set to null + * + * @param message the detail message + */ + def this(message: String) = this(message, null) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala new file mode 100644 index 0000000..40feb98 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala @@ -0,0 +1,46 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventJson4sSupport + +import org.json4s.Formats +import org.json4s.DefaultFormats +import org.json4s.JObject +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write + + +private[prediction] object ConnectorUtil { + + implicit val eventJson4sFormats: Formats = DefaultFormats + + new EventJson4sSupport.APISerializer + + // intentionally use EventJson4sSupport.APISerializer to convert + // from JSON to Event object. Don't allow connector directly create + // Event object so that the Event object formation is consistent + // by enforcing JSON format + + def toEvent(connector: JsonConnector, data: JObject): Event = { + read[Event](write(connector.toEventJson(data))) + } + + def toEvent(connector: FormConnector, data: Map[String, String]): Event = { + read[Event](write(connector.toEventJson(data))) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala new file mode 100644 index 0000000..dd04a21 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala @@ -0,0 +1,32 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks + +import org.json4s.JObject + +/** Connector for Webhooks connection with Form submission data format + */ +private[prediction] trait FormConnector { + + // TODO: support conversion to multiple events? + + /** Convert from original Form submission data to Event JObject + * @param data Map of key-value pairs in String type received through webhooks + * @return Event JObject + */ + def toEventJson(data: Map[String, String]): JObject + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala new file mode 100644 index 0000000..eda8059 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala @@ -0,0 +1,31 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks + +import org.json4s.JObject + +/** Connector for Webhooks connection */ +private[prediction] trait JsonConnector { + + // TODO: support conversion to multiple events? + + /** Convert from original JObject to Event JObject + * @param data original JObject recevived through webhooks + * @return Event JObject + */ + def toEventJson(data: JObject): JObject + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala new file mode 100644 index 0000000..adf8791 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala @@ -0,0 +1,123 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks.exampleform + +import org.apache.predictionio.data.webhooks.FormConnector +import org.apache.predictionio.data.webhooks.ConnectorException + +import org.json4s.JObject + + +/** Example FormConnector with following types of webhook form data inputs: + * + * UserAction + * + * "type"="userAction" + * "userId"="as34smg4", + * "event"="do_something", + * "context[ip]"="24.5.68.47", // optional + * "context[prop1]"="2.345", // optional + * "context[prop2]"="value1" // optional + * "anotherProperty1"="100", + * "anotherProperty2"="optional1", // optional + * "timestamp"="2015-01-02T00:30:12.984Z" + * + * UserActionItem + * + * "type"="userActionItem" + * "userId"="as34smg4", + * "event"="do_something_on", + * "itemId"="kfjd312bc", + * "context[ip]"="1.23.4.56", + * "context[prop1]"="2.345", + * "context[prop2]"="value1", + * "anotherPropertyA"="4.567", // optional + * "anotherPropertyB"="false", // optional + * "timestamp"="2015-01-15T04:20:23.567Z" + * + */ +private[prediction] object ExampleFormConnector extends FormConnector { + + override + def toEventJson(data: Map[String, String]): JObject = { + val json = try { + data.get("type") match { + case Some("userAction") => userActionToEventJson(data) + case Some("userActionItem") => userActionItemToEventJson(data) + case Some(x) => throw new ConnectorException( + s"Cannot convert unknown type ${x} to event JSON") + case None => throw new ConnectorException( + s"The field 'type' is required.") + } + } catch { + case e: ConnectorException => throw e + case e: Exception => throw new ConnectorException( + s"Cannot convert ${data} to event JSON. ${e.getMessage()}", e) + } + json + } + + def userActionToEventJson(data: Map[String, String]): JObject = { + import org.json4s.JsonDSL._ + + // two level optional data + val context = if (data.exists(_._1.startsWith("context["))) { + Some( + ("ip" -> data.get("context[ip]")) ~ + ("prop1" -> data.get("context[prop1]").map(_.toDouble)) ~ + ("prop2" -> data.get("context[prop2]")) + ) + } else { + None + } + + val json = + ("event" -> data("event")) ~ + ("entityType" -> "user") ~ + ("entityId" -> data("userId")) ~ + ("eventTime" -> data("timestamp")) ~ + ("properties" -> ( + ("context" -> context) ~ + ("anotherProperty1" -> data("anotherProperty1").toInt) ~ + ("anotherProperty2" -> data.get("anotherProperty2")) + )) + json + } + + + def userActionItemToEventJson(data: Map[String, String]): JObject = { + import org.json4s.JsonDSL._ + + val json = + ("event" -> data("event")) ~ + ("entityType" -> "user") ~ + ("entityId" -> data("userId")) ~ + ("targetEntityType" -> "item") ~ + ("targetEntityId" -> data("itemId")) ~ + ("eventTime" -> data("timestamp")) ~ + ("properties" -> ( + ("context" -> ( + ("ip" -> data("context[ip]")) ~ + ("prop1" -> data("context[prop1]").toDouble) ~ + ("prop2" -> data("context[prop2]")) + )) ~ + ("anotherPropertyA" -> data.get("anotherPropertyA").map(_.toDouble)) ~ + ("anotherPropertyB" -> data.get("anotherPropertyB").map(_.toBoolean)) + )) + json + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala new file mode 100644 index 0000000..2129134 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala @@ -0,0 +1,153 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks.examplejson + +import org.apache.predictionio.data.webhooks.JsonConnector +import org.apache.predictionio.data.webhooks.ConnectorException + +import org.json4s.Formats +import org.json4s.DefaultFormats +import org.json4s.JObject + +/** Example JsonConnector with following types of webhooks JSON input: + * + * UserAction + * + * { + * "type": "userAction" + * "userId": "as34smg4", + * "event": "do_something", + * "context": { + * "ip": "24.5.68.47", + * "prop1": 2.345, + * "prop2": "value1" + * }, + * "anotherProperty1": 100, + * "anotherProperty2": "optional1", + * "timestamp": "2015-01-02T00:30:12.984Z" + * } + * + * UserActionItem + * + * { + * "type": "userActionItem" + * "userId": "as34smg4", + * "event": "do_something_on", + * "itemId": "kfjd312bc", + * "context": { + * "ip": "1.23.4.56", + * "prop1": 2.345, + * "prop2": "value1" + * }, + * "anotherPropertyA": 4.567, + * "anotherPropertyB": false, + * "timestamp": "2015-01-15T04:20:23.567Z" + * } + */ +private[prediction] object ExampleJsonConnector extends JsonConnector { + + implicit val json4sFormats: Formats = DefaultFormats + + override def toEventJson(data: JObject): JObject = { + val common = try { + data.extract[Common] + } catch { + case e: Exception => throw new ConnectorException( + s"Cannot extract Common field from ${data}. ${e.getMessage()}", e) + } + + val json = try { + common.`type` match { + case "userAction" => + toEventJson(common = common, userAction = data.extract[UserAction]) + case "userActionItem" => + toEventJson(common = common, userActionItem = data.extract[UserActionItem]) + case x: String => + throw new ConnectorException( + s"Cannot convert unknown type '${x}' to Event JSON.") + } + } catch { + case e: ConnectorException => throw e + case e: Exception => throw new ConnectorException( + s"Cannot convert ${data} to eventJson. ${e.getMessage()}", e) + } + + json + } + + def toEventJson(common: Common, userAction: UserAction): JObject = { + import org.json4s.JsonDSL._ + + // map to EventAPI JSON + val json = + ("event" -> userAction.event) ~ + ("entityType" -> "user") ~ + ("entityId" -> userAction.userId) ~ + ("eventTime" -> userAction.timestamp) ~ + ("properties" -> ( + ("context" -> userAction.context) ~ + ("anotherProperty1" -> userAction.anotherProperty1) ~ + ("anotherProperty2" -> userAction.anotherProperty2) + )) + json + } + + def toEventJson(common: Common, userActionItem: UserActionItem): JObject = { + import org.json4s.JsonDSL._ + + // map to EventAPI JSON + val json = + ("event" -> userActionItem.event) ~ + ("entityType" -> "user") ~ + ("entityId" -> userActionItem.userId) ~ + ("targetEntityType" -> "item") ~ + ("targetEntityId" -> userActionItem.itemId) ~ + ("eventTime" -> userActionItem.timestamp) ~ + ("properties" -> ( + ("context" -> userActionItem.context) ~ + ("anotherPropertyA" -> userActionItem.anotherPropertyA) ~ + ("anotherPropertyB" -> userActionItem.anotherPropertyB) + )) + json + } + + // Common required fields + case class Common( + `type`: String + ) + + // User Actions fields + case class UserAction ( + userId: String, + event: String, + context: Option[JObject], + anotherProperty1: Int, + anotherProperty2: Option[String], + timestamp: String + ) + + // UserActionItem fields + case class UserActionItem ( + userId: String, + event: String, + itemId: String, + context: JObject, + anotherPropertyA: Option[Double], + anotherPropertyB: Option[Boolean], + timestamp: String + ) + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala new file mode 100644 index 0000000..abf8a7f --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala @@ -0,0 +1,305 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.webhooks.mailchimp + +import org.apache.predictionio.data.webhooks.FormConnector +import org.apache.predictionio.data.webhooks.ConnectorException +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.Utils + +import org.json4s.JObject + +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat + +private[prediction] object MailChimpConnector extends FormConnector { + + override + def toEventJson(data: Map[String, String]): JObject = { + + val json = data.get("type") match { + case Some("subscribe") => subscribeToEventJson(data) + // UNSUBSCRIBE + case Some("unsubscribe") => unsubscribeToEventJson(data) + // PROFILE UPDATES + case Some("profile") => profileToEventJson(data) + // EMAIL UPDATE + case Some("upemail") => upemailToEventJson(data) + // CLEANED EMAILS + case Some("cleaned") => cleanedToEventJson(data) + // CAMPAIGN SENDING STATUS + case Some("campaign") => campaignToEventJson(data) + // invalid type + case Some(x) => throw new ConnectorException( + s"Cannot convert unknown MailChimp data type ${x} to event JSON") + case None => throw new ConnectorException( + s"The field 'type' is required for MailChimp data.") + } + json + } + + + val mailChimpDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") + .withZone(EventValidation.defaultTimeZone) + + def parseMailChimpDateTime(s: String): DateTime = { + mailChimpDateTimeFormat.parseDateTime(s) + } + + def subscribeToEventJson(data: Map[String, String]): JObject = { + + import org.json4s.JsonDSL._ + + /* + "type": "subscribe", + "fired_at": "2009-03-26 21:35:57", + "data[id]": "8a25ff1d98", + "data[list_id]": "a6b5da1054", + "data[email]": "[email protected]", + "data[email_type]": "html", + "data[merges][EMAIL]": "[email protected]", + "data[merges][FNAME]": "MailChimp", + "data[merges][LNAME]": "API", + "data[merges][INTERESTS]": "Group1,Group2", + "data[ip_opt]": "10.20.10.30", + "data[ip_signup]": "10.20.10.30" + */ + + // convert to ISO8601 format + val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) + + // TODO: handle optional fields + val json = + ("event" -> "subscribe") ~ + ("entityType" -> "user") ~ + ("entityId" -> data("data[id]")) ~ + ("targetEntityType" -> "list") ~ + ("targetEntityId" -> data("data[list_id]")) ~ + ("eventTime" -> eventTime) ~ + ("properties" -> ( + ("email" -> data("data[email]")) ~ + ("email_type" -> data("data[email_type]")) ~ + ("merges" -> ( + ("EMAIL" -> data("data[merges][EMAIL]")) ~ + ("FNAME" -> data("data[merges][FNAME]"))) ~ + ("LNAME" -> data("data[merges][LNAME]")) ~ + ("INTERESTS" -> data.get("data[merges][INTERESTS]")) + )) ~ + ("ip_opt" -> data("data[ip_opt]")) ~ + ("ip_signup" -> data("data[ip_signup]") + )) + + json + + } + + def unsubscribeToEventJson(data: Map[String, String]): JObject = { + + import org.json4s.JsonDSL._ + + /* + "action" will either be "unsub" or "delete". + The reason will be "manual" unless caused by a spam complaint - then it will be "abuse" + + "type": "unsubscribe", + "fired_at": "2009-03-26 21:40:57", + "data[action]": "unsub", + "data[reason]": "manual", + "data[id]": "8a25ff1d98", + "data[list_id]": "a6b5da1054", + "data[email]": "[email protected]", + "data[email_type]": "html", + "data[merges][EMAIL]": "[email protected]", + "data[merges][FNAME]": "MailChimp", + "data[merges][LNAME]": "API", + "data[merges][INTERESTS]": "Group1,Group2", + "data[ip_opt]": "10.20.10.30", + "data[campaign_id]": "cb398d21d2", + */ + + // convert to ISO8601 format + val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) + + val json = + ("event" -> "unsubscribe") ~ + ("entityType" -> "user") ~ + ("entityId" -> data("data[id]")) ~ + ("targetEntityType" -> "list") ~ + ("targetEntityId" -> data("data[list_id]")) ~ + ("eventTime" -> eventTime) ~ + ("properties" -> ( + ("action" -> data("data[action]")) ~ + ("reason" -> data("data[reason]")) ~ + ("email" -> data("data[email]")) ~ + ("email_type" -> data("data[email_type]")) ~ + ("merges" -> ( + ("EMAIL" -> data("data[merges][EMAIL]")) ~ + ("FNAME" -> data("data[merges][FNAME]"))) ~ + ("LNAME" -> data("data[merges][LNAME]")) ~ + ("INTERESTS" -> data.get("data[merges][INTERESTS]")) + )) ~ + ("ip_opt" -> data("data[ip_opt]")) ~ + ("campaign_id" -> data("data[campaign_id]") + )) + + json + + } + + def profileToEventJson(data: Map[String, String]): JObject = { + + import org.json4s.JsonDSL._ + + /* + "type": "profile", + "fired_at": "2009-03-26 21:31:21", + "data[id]": "8a25ff1d98", + "data[list_id]": "a6b5da1054", + "data[email]": "[email protected]", + "data[email_type]": "html", + "data[merges][EMAIL]": "[email protected]", + "data[merges][FNAME]": "MailChimp", + "data[merges][LNAME]": "API", + "data[merges][INTERESTS]": "Group1,Group2", \\OPTIONAL + "data[ip_opt]": "10.20.10.30" + */ + + // convert to ISO8601 format + val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) + + val json = + ("event" -> "profile") ~ + ("entityType" -> "user") ~ + ("entityId" -> data("data[id]")) ~ + ("targetEntityType" -> "list") ~ + ("targetEntityId" -> data("data[list_id]")) ~ + ("eventTime" -> eventTime) ~ + ("properties" -> ( + ("email" -> data("data[email]")) ~ + ("email_type" -> data("data[email_type]")) ~ + ("merges" -> ( + ("EMAIL" -> data("data[merges][EMAIL]")) ~ + ("FNAME" -> data("data[merges][FNAME]"))) ~ + ("LNAME" -> data("data[merges][LNAME]")) ~ + ("INTERESTS" -> data.get("data[merges][INTERESTS]")) + )) ~ + ("ip_opt" -> data("data[ip_opt]") + )) + + json + + } + + def upemailToEventJson(data: Map[String, String]): JObject = { + + import org.json4s.JsonDSL._ + + /* + "type": "upemail", + "fired_at": "2009-03-26 22:15:09", + "data[list_id]": "a6b5da1054", + "data[new_id]": "51da8c3259", + "data[new_email]": "[email protected]", + "data[old_email]": "[email protected]" + */ + + // convert to ISO8601 format + val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) + + val json = + ("event" -> "upemail") ~ + ("entityType" -> "user") ~ + ("entityId" -> data("data[new_id]")) ~ + ("targetEntityType" -> "list") ~ + ("targetEntityId" -> data("data[list_id]")) ~ + ("eventTime" -> eventTime) ~ + ("properties" -> ( + ("new_email" -> data("data[new_email]")) ~ + ("old_email" -> data("data[old_email]")) + )) + + json + + } + + def cleanedToEventJson(data: Map[String, String]): JObject = { + + import org.json4s.JsonDSL._ + + /* + Reason will be one of "hard" (for hard bounces) or "abuse" + "type": "cleaned", + "fired_at": "2009-03-26 22:01:00", + "data[list_id]": "a6b5da1054", + "data[campaign_id]": "4fjk2ma9xd", + "data[reason]": "hard", + "data[email]": "[email protected]" + */ + + // convert to ISO8601 format + val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) + + val json = + ("event" -> "cleaned") ~ + ("entityType" -> "list") ~ + ("entityId" -> data("data[list_id]")) ~ + ("eventTime" -> eventTime) ~ + ("properties" -> ( + ("campaignId" -> data("data[campaign_id]")) ~ + ("reason" -> data("data[reason]")) ~ + ("email" -> data("data[email]")) + )) + + json + + } + + def campaignToEventJson(data: Map[String, String]): JObject = { + + import org.json4s.JsonDSL._ + + /* + "type": "campaign", + "fired_at": "2009-03-26 21:31:21", + "data[id]": "5aa2102003", + "data[subject]": "Test Campaign Subject", + "data[status]": "sent", + "data[reason]": "", + "data[list_id]": "a6b5da1054" + */ + + // convert to ISO8601 format + val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) + + val json = + ("event" -> "campaign") ~ + ("entityType" -> "campaign") ~ + ("entityId" -> data("data[id]")) ~ + ("targetEntityType" -> "list") ~ + ("targetEntityId" -> data("data[list_id]")) ~ + ("eventTime" -> eventTime) ~ + ("properties" -> ( + ("subject" -> data("data[subject]")) ~ + ("status" -> data("data[status]")) ~ + ("reason" -> data("data[reason]")) + )) + + json + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala new file mode 100644 index 0000000..b7548b0 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala @@ -0,0 +1,306 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.webhooks.segmentio + +import org.apache.predictionio.data.webhooks.{ConnectorException, JsonConnector} +import org.json4s._ + +private[prediction] object SegmentIOConnector extends JsonConnector { + + // private lazy val supportedAPI = Vector("2", "2.0", "2.0.0") + + implicit val json4sFormats: Formats = DefaultFormats + + override + def toEventJson(data: JObject): JObject = { + try { + val version: String = data.values("version").toString +/* + if (!supportedAPI.contains(version)) { + throw new ConnectorException( + s"Supported segment.io API versions: [2]. got [$version]" + ) + } +*/ + } catch { case _: Throwable â + throw new ConnectorException(s"Failed to get segment.io API version.") + } + + val common = try { + data.extract[Common] + } catch { + case e: Throwable â throw new ConnectorException( + s"Cannot extract Common field from $data. ${e.getMessage}", e + ) + } + + try { + common.`type` match { + case "identify" â + toEventJson( + common = common, + identify = data.extract[Events.Identify] + ) + + case "track" â + toEventJson( + common = common, + track = data.extract[Events.Track] + ) + + case "alias" â + toEventJson( + common = common, + alias = data.extract[Events.Alias] + ) + + case "page" â + toEventJson( + common = common, + page = data.extract[Events.Page] + ) + + case "screen" â + toEventJson( + common = common, + screen = data.extract[Events.Screen] + ) + + case "group" â + toEventJson( + common = common, + group = data.extract[Events.Group] + ) + + case _ â + throw new ConnectorException( + s"Cannot convert unknown type ${common.`type`} to event JSON." + ) + } + } catch { + case e: ConnectorException => throw e + case e: Exception => + throw new ConnectorException( + s"Cannot convert $data to event JSON. ${e.getMessage}", e + ) + } + } + + def toEventJson(common: Common, identify: Events.Identify ): JObject = { + import org.json4s.JsonDSL._ + val eventProperties = "traits" â identify.traits + toJson(common, eventProperties) + } + + def toEventJson(common: Common, track: Events.Track): JObject = { + import org.json4s.JsonDSL._ + val eventProperties = + ("properties" â track.properties) ~ + ("event" â track.event) + toJson(common, eventProperties) + } + + def toEventJson(common: Common, alias: Events.Alias): JObject = { + import org.json4s.JsonDSL._ + toJson(common, "previous_id" â alias.previous_id) + } + + def toEventJson(common: Common, screen: Events.Screen): JObject = { + import org.json4s.JsonDSL._ + val eventProperties = + ("name" â screen.name) ~ + ("properties" â screen.properties) + toJson(common, eventProperties) + } + + def toEventJson(common: Common, page: Events.Page): JObject = { + import org.json4s.JsonDSL._ + val eventProperties = + ("name" â page.name) ~ + ("properties" â page.properties) + toJson(common, eventProperties) + } + + def toEventJson(common: Common, group: Events.Group): JObject = { + import org.json4s.JsonDSL._ + val eventProperties = + ("group_id" â group.group_id) ~ + ("traits" â group.traits) + toJson(common, eventProperties) + } + + private def toJson(common: Common, props: JObject): JsonAST.JObject = { + val commonFields = commonToJson(common) + JObject(("properties" â properties(common, props)) :: commonFields.obj) + } + + private def properties(common: Common, eventProps: JObject): JObject = { + import org.json4s.JsonDSL._ + common.context map { context â + try { + ("context" â Extraction.decompose(context)) ~ eventProps + } catch { + case e: Throwable â + throw new ConnectorException( + s"Cannot convert $context to event JSON. ${e.getMessage }", e + ) + } + } getOrElse eventProps + } + + private def commonToJson(common: Common): JObject = + commonToJson(common, common.`type`) + + private def commonToJson(common: Common, typ: String): JObject = { + import org.json4s.JsonDSL._ + common.user_id.orElse(common.anonymous_id) match { + case Some(userId) â + ("event" â typ) ~ + ("entityType" â "user") ~ + ("entityId" â userId) ~ + ("eventTime" â common.timestamp) + + case None â + throw new ConnectorException( + "there was no `userId` or `anonymousId` in the common fields." + ) + } + } +} + +object Events { + + private[prediction] case class Track( + event: String, + properties: Option[JObject] = None + ) + + private[prediction] case class Alias(previous_id: String, user_id: String) + + private[prediction] case class Group( + group_id: String, + traits: Option[JObject] = None + ) + + private[prediction] case class Screen( + name: Option[String] = None, + properties: Option[JObject] = None + ) + + private[prediction] case class Page( + name: Option[String] = None, + properties: Option[JObject] = None + ) + + private[prediction] case class Identify( + user_id: String, + traits: Option[JObject] + ) + +} + +object Common { + + private[prediction] case class Integrations( + All: Boolean = false, + Mixpanel: Boolean = false, + Marketo: Boolean = false, + Salesforse: Boolean = false + ) + + private[prediction] case class Context( + ip: String, + library: Library, + user_agent: String, + app: Option[App] = None, + campaign: Option[Campaign] = None, + device: Option[Device] = None, + network: Option[Network] = None, + location: Option[Location] = None, + os: Option[OS] = None, + referrer: Option[Referrer] = None, + screen: Option[Screen] = None, + timezone: Option[String] = None + ) + + private[prediction] case class Screen(width: Int, height: Int, density: Int) + + private[prediction] case class Referrer(id: String, `type`: String) + + private[prediction] case class OS(name: String, version: String) + + private[prediction] case class Location( + city: Option[String] = None, + country: Option[String] = None, + latitude: Option[Double] = None, + longitude: Option[Double] = None, + speed: Option[Int] = None + ) + + case class Page( + path: String, + referrer: String, + search: String, + title: String, + url: String + ) + + private[prediction] case class Network( + bluetooth: Option[Boolean] = None, + carrier: Option[String] = None, + cellular: Option[Boolean] = None, + wifi: Option[Boolean] = None + ) + + private[prediction] case class Library(name: String, version: String) + + private[prediction] case class Device( + id: Option[String] = None, + advertising_id: Option[String] = None, + ad_tracking_enabled: Option[Boolean] = None, + manufacturer: Option[String] = None, + model: Option[String] = None, + name: Option[String] = None, + `type`: Option[String] = None, + token: Option[String] = None + ) + + private[prediction] case class Campaign( + name: Option[String] = None, + source: Option[String] = None, + medium: Option[String] = None, + term: Option[String] = None, + content: Option[String] = None + ) + + private[prediction] case class App( + name: Option[String] = None, + version: Option[String] = None, + build: Option[String] = None + ) + +} + +private[prediction] case class Common( + `type`: String, + sent_at: String, + timestamp: String, + version: String, + anonymous_id: Option[String] = None, + user_id: Option[String] = None, + context: Option[Common.Context] = None, + integrations: Option[Common.Integrations] = None +) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala b/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala deleted file mode 100644 index 9f7a74e..0000000 --- a/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.api - -import io.prediction.data.storage.Storage - -import akka.testkit.TestProbe -import akka.actor.ActorSystem -import akka.actor.Props - -import spray.http.HttpEntity -import spray.http.HttpResponse -import spray.http.ContentTypes -import spray.httpx.RequestBuilding.Get - -import org.specs2.mutable.Specification - -class EventServiceSpec extends Specification { - - val system = ActorSystem("EventServiceSpecSystem") - - val eventClient = Storage.getLEvents() - val accessKeysClient = Storage.getMetaDataAccessKeys() - val channelsClient = Storage.getMetaDataChannels() - - val eventServiceActor = system.actorOf( - Props( - new EventServiceActor( - eventClient, - accessKeysClient, - channelsClient, - EventServerConfig() - ) - ) - ) - - "GET / request" should { - "properly produce OK HttpResponses" in { - val probe = TestProbe()(system) - probe.send(eventServiceActor, Get("/")) - probe.expectMsg( - HttpResponse( - 200, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"status":"alive"}""" - ) - ) - ) - success - } - } - - step(system.shutdown()) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala deleted file mode 100644 index bae0f0b..0000000 --- a/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala +++ /dev/null @@ -1,175 +0,0 @@ -package io.prediction.data.api - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import io.prediction.data.storage._ -import org.joda.time.DateTime -import org.specs2.mutable.Specification -import spray.http.HttpHeaders.RawHeader -import spray.http.{ContentTypes, HttpEntity, HttpResponse} -import spray.httpx.RequestBuilding._ -import sun.misc.BASE64Encoder - -import scala.concurrent.{Future, ExecutionContext} - -class SegmentIOAuthSpec extends Specification { - - val system = ActorSystem("EventServiceSpecSystem") - sequential - isolated - val eventClient = new LEvents { - override def init(appId: Int, channelId: Option[Int]): Boolean = true - - override def futureInsert(event: Event, appId: Int, channelId: Option[Int]) - (implicit ec: ExecutionContext): Future[String] = - Future successful "event_id" - - override def futureFind( - appId: Int, channelId: Option[Int], startTime: Option[DateTime], - untilTime: Option[DateTime], entityType: Option[String], - entityId: Option[String], eventNames: Option[Seq[String]], - targetEntityType: Option[Option[String]], - targetEntityId: Option[Option[String]], limit: Option[Int], - reversed: Option[Boolean]) - (implicit ec: ExecutionContext): Future[Iterator[Event]] = - Future successful List.empty[Event].iterator - - override def futureGet(eventId: String, appId: Int, channelId: Option[Int]) - (implicit ec: ExecutionContext): Future[Option[Event]] = - Future successful None - - override def remove(appId: Int, channelId: Option[Int]): Boolean = true - - override def futureDelete(eventId: String, appId: Int, channelId: Option[Int]) - (implicit ec: ExecutionContext): Future[Boolean] = - Future successful true - - override def close(): Unit = {} - } - val appId = 0 - val accessKeysClient = new AccessKeys { - override def insert(k: AccessKey): Option[String] = null - override def getByAppid(appid: Int): Seq[AccessKey] = null - override def update(k: AccessKey): Unit = {} - override def delete(k: String): Unit = {} - override def getAll(): Seq[AccessKey] = null - - override def get(k: String): Option[AccessKey] = - k match { - case "abc" â Some(AccessKey(k, appId, Seq.empty)) - case _ â None - } - } - - val channelsClient = Storage.getMetaDataChannels() - val eventServiceActor = system.actorOf( - Props( - new EventServiceActor( - eventClient, - accessKeysClient, - channelsClient, - EventServerConfig() - ) - ) - ) - - val base64Encoder = new BASE64Encoder - - "Event Service" should { - - "reject with CredentialsRejected with invalid credentials" in { - val accessKey = "abc123:" - val probe = TestProbe()(system) - probe.send( - eventServiceActor, - Post("/webhooks/segmentio.json") - .withHeaders( - List( - RawHeader("Authorization", s"Basic $accessKey") - ) - ) - ) - probe.expectMsg( - HttpResponse( - 401, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"message":"Invalid accessKey."}""" - ) - ) - ) - success - } - - "reject with CredentialsMissed without credentials" in { - val probe = TestProbe()(system) - probe.send( - eventServiceActor, - Post("/webhooks/segmentio.json") - ) - probe.expectMsg( - HttpResponse( - 401, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"message":"Missing accessKey."}""" - ) - ) - ) - success - } - - "process SegmentIO identity request properly" in { - val jsonReq = - """ - |{ - | "anonymous_id": "507f191e810c19729de860ea", - | "channel": "browser", - | "context": { - | "ip": "8.8.8.8", - | "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)" - | }, - | "message_id": "022bb90c-bbac-11e4-8dfc-aa07a5b093db", - | "timestamp": "2015-02-23T22:28:55.387Z", - | "sent_at": "2015-02-23T22:28:55.111Z", - | "traits": { - | "name": "Peter Gibbons", - | "email": "[email protected]", - | "plan": "premium", - | "logins": 5 - | }, - | "type": "identify", - | "user_id": "97980cfea0067", - | "version": "2" - |} - """.stripMargin - - val accessKey = "abc:" - val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes) - val probe = TestProbe()(system) - probe.send( - eventServiceActor, - Post( - "/webhooks/segmentio.json", - HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes) - ).withHeaders( - List( - RawHeader("Authorization", s"Basic $accessKeyEncoded") - ) - ) - ) - probe.expectMsg( - HttpResponse( - 201, - HttpEntity( - contentType = ContentTypes.`application/json`, - string = """{"eventId":"event_id"}""" - ) - ) - ) - success - } - } - - step(system.shutdown()) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala b/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala deleted file mode 100644 index e6d28b3..0000000 --- a/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala +++ /dev/null @@ -1,196 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.specs2.mutable._ - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD - -class BiMapSpec extends Specification { - - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - val sc = new SparkContext("local[4]", "BiMapSpec test") - - "BiMap created with map" should { - - val keys = Seq(1, 4, 6) - val orgValues = Seq(2, 5, 7) - val org = keys.zip(orgValues).toMap - val bi = BiMap(org) - - "return correct values for each key of original map" in { - val biValues = keys.map(k => bi(k)) - - biValues must beEqualTo(orgValues) - } - - "get return Option[V]" in { - val checkKeys = keys ++ Seq(12345) - val biValues = checkKeys.map(k => bi.get(k)) - val expected = orgValues.map(Some(_)) ++ Seq(None) - - biValues must beEqualTo(expected) - } - - "getOrElse return value for each key of original map" in { - val biValues = keys.map(k => bi.getOrElse(k, -1)) - - biValues must beEqualTo(orgValues) - } - - "getOrElse return default values for invalid key" in { - val keys = Seq(999, -1, -2) - val defaults = Seq(1234, 5678, 987) - val biValues = keys.zip(defaults).map{ case (k,d) => bi.getOrElse(k, d) } - - biValues must beEqualTo(defaults) - } - - "contains() returns true/false correctly" in { - val checkKeys = keys ++ Seq(12345) - val biValues = checkKeys.map(k => bi.contains(k)) - val expected = orgValues.map(_ => true) ++ Seq(false) - - biValues must beEqualTo(expected) - } - - "same size as original map" in { - (bi.size) must beEqualTo(org.size) - } - - "take(2) returns BiMap of size 2" in { - bi.take(2).size must beEqualTo(2) - } - - "toMap contain same element as original map" in { - (bi.toMap) must beEqualTo(org) - } - - "toSeq contain same element as original map" in { - (bi.toSeq) must containTheSameElementsAs(org.toSeq) - } - - "inverse and return correct keys for each values of original map" in { - val biKeys = orgValues.map(v => bi.inverse(v)) - biKeys must beEqualTo(keys) - } - - "inverse with same size" in { - bi.inverse.size must beEqualTo(org.size) - } - - "inverse's inverse reference back to the same original object" in { - // NOTE: reference equality - bi.inverse.inverse == bi - } - } - - "BiMap created with duplicated values in map" should { - val dup = Map(1 -> 2, 4 -> 7, 6 -> 7) - "return IllegalArgumentException" in { - BiMap(dup) must throwA[IllegalArgumentException] - } - } - - "BiMap.stringLong and stringInt" should { - - "create BiMap from set of string" in { - val keys = Set("a", "b", "foo", "bar") - val values: Seq[Long] = Seq(0, 1, 2, 3) - - val bi = BiMap.stringLong(keys) - val biValues = keys.map(k => bi(k)) - - val biInt = BiMap.stringInt(keys) - val valuesInt: Seq[Int] = values.map(_.toInt) - val biIntValues = keys.map(k => biInt(k)) - - biValues must containTheSameElementsAs(values) and - (biIntValues must containTheSameElementsAs(valuesInt)) - } - - "create BiMap from Array of unique string" in { - val keys = Array("a", "b", "foo", "bar") - val values: Seq[Long] = Seq(0, 1, 2, 3) - - val bi = BiMap.stringLong(keys) - val biValues = keys.toSeq.map(k => bi(k)) - - val biInt = BiMap.stringInt(keys) - val valuesInt: Seq[Int] = values.map(_.toInt) - val biIntValues = keys.toSeq.map(k => biInt(k)) - - biValues must containTheSameElementsAs(values) and - (biIntValues must containTheSameElementsAs(valuesInt)) - } - - "not guarantee sequential index for Array with duplicated string" in { - val keys = Array("a", "b", "foo", "bar", "a", "b", "x") - val dupValues: Seq[Long] = Seq(0, 1, 2, 3, 4, 5, 6) - val values = keys.zip(dupValues).toMap.values.toSeq - - val bi = BiMap.stringLong(keys) - val biValues = keys.toSet[String].map(k => bi(k)) - - val biInt = BiMap.stringInt(keys) - val valuesInt: Seq[Int] = values.map(_.toInt) - val biIntValues = keys.toSet[String].map(k => biInt(k)) - - biValues must containTheSameElementsAs(values) and - (biIntValues must containTheSameElementsAs(valuesInt)) - } - - "create BiMap from RDD[String]" in { - - val keys = Seq("a", "b", "foo", "bar") - val values: Seq[Long] = Seq(0, 1, 2, 3) - val rdd = sc.parallelize(keys) - - val bi = BiMap.stringLong(rdd) - val biValues = keys.map(k => bi(k)) - - val biInt = BiMap.stringInt(rdd) - val valuesInt: Seq[Int] = values.map(_.toInt) - val biIntValues = keys.map(k => biInt(k)) - - biValues must containTheSameElementsAs(values) and - (biIntValues must containTheSameElementsAs(valuesInt)) - } - - "create BiMap from RDD[String] with duplicated string" in { - - val keys = Seq("a", "b", "foo", "bar", "a", "b", "x") - val values: Seq[Long] = Seq(0, 1, 2, 3, 4) - val rdd = sc.parallelize(keys) - - val bi = BiMap.stringLong(rdd) - val biValues = keys.distinct.map(k => bi(k)) - - val biInt = BiMap.stringInt(rdd) - val valuesInt: Seq[Int] = values.map(_.toInt) - val biIntValues = keys.distinct.map(k => biInt(k)) - - biValues must containTheSameElementsAs(values) and - (biIntValues must containTheSameElementsAs(valuesInt)) - } - } - - step(sc.stop()) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala b/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala deleted file mode 100644 index 97e9b09..0000000 --- a/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala +++ /dev/null @@ -1,243 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.specs2.mutable._ - -class DataMapSpec extends Specification { - - "DataMap" should { - - val properties = DataMap(""" - { - "prop1" : 1, - "prop2" : "value2", - "prop3" : [1, 2, 3], - "prop4" : true, - "prop5" : ["a", "b", "c", "c"], - "prop6" : 4.56 - } - """) - - "get Int data" in { - properties.get[Int]("prop1") must beEqualTo(1) - properties.getOpt[Int]("prop1") must beEqualTo(Some(1)) - } - - "get String data" in { - properties.get[String]("prop2") must beEqualTo("value2") - properties.getOpt[String]("prop2") must beEqualTo(Some("value2")) - } - - "get List of Int data" in { - properties.get[List[Int]]("prop3") must beEqualTo(List(1,2,3)) - properties.getOpt[List[Int]]("prop3") must beEqualTo(Some(List(1,2,3))) - } - - "get Boolean data" in { - properties.get[Boolean]("prop4") must beEqualTo(true) - properties.getOpt[Boolean]("prop4") must beEqualTo(Some(true)) - } - - "get List of String data" in { - properties.get[List[String]]("prop5") must beEqualTo(List("a", "b", "c", "c")) - properties.getOpt[List[String]]("prop5") must beEqualTo(Some(List("a", "b", "c", "c"))) - } - - "get Set of String data" in { - properties.get[Set[String]]("prop5") must beEqualTo(Set("a", "b", "c")) - properties.getOpt[Set[String]]("prop5") must beEqualTo(Some(Set("a", "b", "c"))) - } - - "get Double data" in { - properties.get[Double]("prop6") must beEqualTo(4.56) - properties.getOpt[Double]("prop6") must beEqualTo(Some(4.56)) - } - - "get empty optional Int data" in { - properties.getOpt[Int]("prop9999") must beEqualTo(None) - } - - } - - "DataMap with multi-level data" should { - val properties = DataMap(""" - { - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1", - "prop4": [1, 2, 3] - }, - "anotherPropertyA": 4.567, - "anotherPropertyB": false - } - """) - - "get case class data" in { - val expected = DataMapSpec.Context( - ip = "1.23.4.56", - prop1 = Some(2.345), - prop2 = Some("value1"), - prop3 = None, - prop4 = List(1,2,3) - ) - - properties.get[DataMapSpec.Context]("context") must beEqualTo(expected) - } - - "get empty optional case class data" in { - properties.getOpt[DataMapSpec.Context]("context999") must beEqualTo(None) - } - - "get double data" in { - properties.get[Double]("anotherPropertyA") must beEqualTo(4.567) - } - - "get boolean data" in { - properties.get[Boolean]("anotherPropertyB") must beEqualTo(false) - } - } - - "DataMap extract" should { - - "extract to case class object" in { - val properties = DataMap(""" - { - "prop1" : 1, - "prop2" : "value2", - "prop3" : [1, 2, 3], - "prop4" : true, - "prop5" : ["a", "b", "c", "c"], - "prop6" : 4.56 - } - """) - - val result = properties.extract[DataMapSpec.BasicProperty] - val expected = DataMapSpec.BasicProperty( - prop1 = 1, - prop2 = "value2", - prop3 = List(1,2,3), - prop4 = true, - prop5 = List("a", "b", "c", "c"), - prop6 = 4.56 - ) - - result must beEqualTo(expected) - } - - "extract with optional fields" in { - val propertiesEmpty = DataMap("""{}""") - val propertiesSome = DataMap(""" - { - "prop1" : 1, - "prop5" : ["a", "b", "c", "c"], - "prop6" : 4.56 - } - """) - - val resultEmpty = propertiesEmpty.extract[DataMapSpec.OptionProperty] - val expectedEmpty = DataMapSpec.OptionProperty( - prop1 = None, - prop2 = None, - prop3 = None, - prop4 = None, - prop5 = None, - prop6 = None - ) - - val resultSome = propertiesSome.extract[DataMapSpec.OptionProperty] - val expectedSome = DataMapSpec.OptionProperty( - prop1 = Some(1), - prop2 = None, - prop3 = None, - prop4 = None, - prop5 = Some(List("a", "b", "c", "c")), - prop6 = Some(4.56) - ) - - resultEmpty must beEqualTo(expectedEmpty) - resultSome must beEqualTo(expectedSome) - } - - "extract to multi-level object" in { - val properties = DataMap(""" - { - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1", - "prop4": [1, 2, 3] - }, - "anotherPropertyA": 4.567, - "anotherPropertyB": false - } - """) - - val result = properties.extract[DataMapSpec.MultiLevelProperty] - val expected = DataMapSpec.MultiLevelProperty( - context = DataMapSpec.Context( - ip = "1.23.4.56", - prop1 = Some(2.345), - prop2 = Some("value1"), - prop3 = None, - prop4 = List(1,2,3) - ), - anotherPropertyA = 4.567, - anotherPropertyB = false - ) - - result must beEqualTo(expected) - } - - } -} - -object DataMapSpec { - - // define this case class inside object to avoid case class name conflict with other tests - case class Context( - ip: String, - prop1: Option[Double], - prop2: Option[String], - prop3: Option[Int], - prop4: List[Int] - ) - - case class BasicProperty( - prop1: Int, - prop2: String, - prop3: List[Int], - prop4: Boolean, - prop5: List[String], - prop6: Double - ) - - case class OptionProperty( - prop1: Option[Int], - prop2: Option[String], - prop3: Option[List[Int]], - prop4: Option[Boolean], - prop5: Option[List[String]], - prop6: Option[Double] - ) - - case class MultiLevelProperty( - context: Context, - anotherPropertyA: Double, - anotherPropertyB: Boolean - ) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala b/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala deleted file mode 100644 index 77a66d5..0000000 --- a/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala +++ /dev/null @@ -1,103 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.specs2.mutable._ - -import org.json4s.JObject -import org.json4s.native.JsonMethods.parse - -import org.joda.time.DateTime - -class LEventAggregatorSpec extends Specification with TestEvents { - - "LEventAggregator.aggregateProperties()" should { - - "aggregate two entities' properties as DataMap correctly" in { - val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) - val result: Map[String, DataMap] = - LEventAggregator.aggregateProperties(events.toIterator) - - val expected = Map( - "u1" -> DataMap(u1), - "u2" -> DataMap(u2) - ) - - result must beEqualTo(expected) - } - - "aggregate two entities' properties as PropertyMap correctly" in { - val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) - val result: Map[String, PropertyMap] = - LEventAggregator.aggregateProperties(events.toIterator) - - val expected = Map( - "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - result must beEqualTo(expected) - } - - - "aggregate deleted entity correctly" in { - val events = Vector(u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2) - - val result = LEventAggregator.aggregateProperties(events.toIterator) - val expected = Map( - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - result must beEqualTo(expected) - } - } - - - "LEventAggregator.aggregatePropertiesSingle()" should { - - "aggregate single entity properties as DataMap correctly" in { - val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2) - val eventsIt = events.toIterator - - val result: Option[DataMap] = LEventAggregator - .aggregatePropertiesSingle(eventsIt) - val expected = DataMap(u1) - - result must beEqualTo(Some(expected)) - } - - "aggregate single entity properties as PropertyMap correctly" in { - val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2) - val eventsIt = events.toIterator - - val result: Option[PropertyMap] = LEventAggregator - .aggregatePropertiesSingle(eventsIt) - val expected = PropertyMap(u1, u1BaseTime, u1LastTime) - - result must beEqualTo(Some(expected)) - } - - "aggregate deleted entity correctly" in { - // put the delete event in the middle - val events = Vector(u1e4, u1e2, u1ed, u1e3, u1e1, u1e5) - val eventsIt = events.toIterator - - val result = LEventAggregator.aggregatePropertiesSingle(eventsIt) - - result must beEqualTo(None) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala b/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala deleted file mode 100644 index 5b38cdb..0000000 --- a/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala +++ /dev/null @@ -1,245 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.specs2._ -import org.specs2.specification.Step - -class LEventsSpec extends Specification with TestEvents { - def is = s2""" - - PredictionIO Storage LEvents Specification - - Events can be implemented by: - - HBLEvents ${hbEvents} - - JDBCLEvents ${jdbcLEvents} - - """ - - def hbEvents = sequential ^ s2""" - - HBLEvents should - - behave like any LEvents implementation ${events(hbDO)} - - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} - - """ - - def jdbcLEvents = sequential ^ s2""" - - JDBCLEvents should - - behave like any LEvents implementation ${events(jdbcDO)} - - """ - - val appId = 1 - - def events(eventClient: LEvents) = sequential ^ s2""" - - init default ${initDefault(eventClient)} - insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)} - insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)} - insert and delete by ID ${insertAndDelete(eventClient)} - insert test user events ${insertTestUserEvents(eventClient)} - find user events ${findUserEvents(eventClient)} - aggregate user properties ${aggregateUserProperties(eventClient)} - aggregate one user properties ${aggregateOneUserProperties(eventClient)} - aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)} - init channel ${initChannel(eventClient)} - insert 2 events to channel ${insertChannel(eventClient)} - insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)} - find events from channel ${findChannel(eventClient)} - remove default ${removeDefault(eventClient)} - remove channel ${removeChannel(eventClient)} - - """ - - val dbName = "test_pio_storage_events_" + hashCode - def hbDO = Storage.getDataObject[LEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName) - - def initDefault(eventClient: LEvents) = { - eventClient.init(appId) - } - - def insertAndGetEvents(eventClient: LEvents) = { - - // events from TestEvents trait - val listOfEvents = List(r1,r2,r3) - - val insertResp = listOfEvents.map { eventClient.insert(_, appId) } - - val insertedEventId: List[String] = insertResp - - val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) - .map { case (e, id) => Some(e.copy(eventId = Some(id))) } - - val getResp = insertedEventId.map { id => eventClient.get(id, appId) } - - val getEvents = getResp - - insertedEvent must containTheSameElementsAs(getEvents) - } - - def insertAndGetTimezone(eventClient: LEvents) = { - val listOfEvents = List(tz1, tz2, tz3) - - val insertResp = listOfEvents.map { eventClient.insert(_, appId) } - - val insertedEventId: List[String] = insertResp - - val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) - .map { case (e, id) => Some(e.copy(eventId = Some(id))) } - - val getResp = insertedEventId.map { id => eventClient.get(id, appId) } - - val getEvents = getResp - - insertedEvent must containTheSameElementsAs(getEvents) - } - - def insertAndDelete(eventClient: LEvents) = { - val eventId = eventClient.insert(r2, appId) - - val resultBefore = eventClient.get(eventId, appId) - - val expectedBefore = r2.copy(eventId = Some(eventId)) - - val deleteStatus = eventClient.delete(eventId, appId) - - val resultAfter = eventClient.get(eventId, appId) - - (resultBefore must beEqualTo(Some(expectedBefore))) and - (deleteStatus must beEqualTo(true)) and - (resultAfter must beEqualTo(None)) - } - - def insertTestUserEvents(eventClient: LEvents) = { - // events from TestEvents trait - val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) - - listOfEvents.map{ eventClient.insert(_, appId) } - - success - } - - def findUserEvents(eventClient: LEvents) = { - - val results: List[Event] = eventClient.find( - appId = appId, - entityType = Some("user")) - .toList - .map(e => e.copy(eventId = None)) // ignore eventID - - // same events in insertTestUserEvents - val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) - - results must containTheSameElementsAs(expected) - } - - def aggregateUserProperties(eventClient: LEvents) = { - - val result: Map[String, PropertyMap] = eventClient.aggregateProperties( - appId = appId, - entityType = "user") - - val expected = Map( - "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - result must beEqualTo(expected) - } - - def aggregateOneUserProperties(eventClient: LEvents) = { - val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( - appId = appId, - entityType = "user", - entityId = "u1") - - val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime)) - - result must beEqualTo(expected) - } - - def aggregateNonExistentUserProperties(eventClient: LEvents) = { - val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( - appId = appId, - entityType = "user", - entityId = "u999999") - - result must beEqualTo(None) - } - - val channelId = 12 - - def initChannel(eventClient: LEvents) = { - eventClient.init(appId, Some(channelId)) - } - - def insertChannel(eventClient: LEvents) = { - - // events from TestEvents trait - val listOfEvents = List(r4,r5) - - listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) ) - - success - } - - def insertAndDeleteChannel(eventClient: LEvents) = { - - val eventId = eventClient.insert(r2, appId, Some(channelId)) - - val resultBefore = eventClient.get(eventId, appId, Some(channelId)) - - val expectedBefore = r2.copy(eventId = Some(eventId)) - - val deleteStatus = eventClient.delete(eventId, appId, Some(channelId)) - - val resultAfter = eventClient.get(eventId, appId, Some(channelId)) - - (resultBefore must beEqualTo(Some(expectedBefore))) and - (deleteStatus must beEqualTo(true)) and - (resultAfter must beEqualTo(None)) - } - - def findChannel(eventClient: LEvents) = { - - val results: List[Event] = eventClient.find( - appId = appId, - channelId = Some(channelId) - ) - .toList - .map(e => e.copy(eventId = None)) // ignore eventId - - // same events in insertChannel - val expected = List(r4, r5) - - results must containTheSameElementsAs(expected) - } - - def removeDefault(eventClient: LEvents) = { - eventClient.remove(appId) - } - - def removeChannel(eventClient: LEvents) = { - eventClient.remove(appId, Some(channelId)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala b/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala deleted file mode 100644 index b00ec7c..0000000 --- a/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.specs2.mutable._ - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - - -class PEventAggregatorSpec extends Specification with TestEvents { - - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") - - "PEventAggregator" should { - - "aggregate two entities' properties as DataMap/PropertyMap correctly" in { - val events = sc.parallelize(Seq( - u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)) - - val users = PEventAggregator.aggregateProperties(events) - - val userMap = users.collectAsMap.toMap - val expectedDM = Map( - "u1" -> DataMap(u1), - "u2" -> DataMap(u2) - ) - - val expectedPM = Map( - "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - userMap must beEqualTo(expectedDM) - userMap must beEqualTo(expectedPM) - } - - "aggregate deleted entity correctly" in { - // put the delete event in middle - val events = sc.parallelize(Seq( - u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2)) - - val users = PEventAggregator.aggregateProperties(events) - - val userMap = users.collectAsMap.toMap - val expectedPM = Map( - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - userMap must beEqualTo(expectedPM) - } - - } - - step(sc.stop()) -}
