http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/LEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/store/LEventStore.scala b/data/src/main/scala/io/prediction/data/store/LEventStore.scala deleted file mode 100644 index be543eb..0000000 --- a/data/src/main/scala/io/prediction/data/store/LEventStore.scala +++ /dev/null @@ -1,142 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.store - -import io.prediction.data.storage.Storage -import io.prediction.data.storage.Event - -import org.joda.time.DateTime - -import scala.concurrent.Await -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.Duration - -/** This object provides a set of operation to access Event Store - * without going through Spark's parallelization - */ -object LEventStore { - - private val defaultTimeout = Duration(60, "seconds") - - @transient lazy private val eventsDb = Storage.getLEvents() - - /** Reads events of the specified entity. May use this in Algorithm's predict() - * or Serving logic to have fast event store access. - * - * @param appName return events of this app - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param channelName return events of this channel (default channel if it's None) - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param limit Limit number of events. Get all events if None or Some(-1) - * @param latest Return latest event first (default true) - * @return Iterator[Event] - */ - def findByEntity( - appName: String, - entityType: String, - entityId: String, - channelName: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - limit: Option[Int] = None, - latest: Boolean = true, - timeout: Duration = defaultTimeout): Iterator[Event] = { - - val (appId, channelId) = Common.appNameToId(appName, channelName) - - Await.result(eventsDb.futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = Some(entityType), - entityId = Some(entityId), - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - limit = limit, - reversed = Some(latest)), - timeout) - } - - /** Reads events generically. If entityType or entityId is not specified, it - * results in table scan. - * - * @param appName return events of this app - * @param entityType return events of this entityType - * - None means no restriction on entityType - * - Some(x) means entityType should match x. - * @param entityId return events of this entityId - * - None means no restriction on entityId - * - Some(x) means entityId should match x. - * @param channelName return events of this channel (default channel if it's None) - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param limit Limit number of events. Get all events if None or Some(-1) - * @return Iterator[Event] - */ - def find( - appName: String, - entityType: Option[String] = None, - entityId: Option[String] = None, - channelName: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - limit: Option[Int] = None, - timeout: Duration = defaultTimeout): Iterator[Event] = { - - val (appId, channelId) = Common.appNameToId(appName, channelName) - - Await.result(eventsDb.futureFind( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - limit = limit), timeout) - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/PEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/store/PEventStore.scala b/data/src/main/scala/io/prediction/data/store/PEventStore.scala deleted file mode 100644 index cd20da9..0000000 --- a/data/src/main/scala/io/prediction/data/store/PEventStore.scala +++ /dev/null @@ -1,116 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.store - -import io.prediction.data.storage.Storage -import io.prediction.data.storage.Event -import io.prediction.data.storage.PropertyMap - -import org.joda.time.DateTime - -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -/** This object provides a set of operation to access Event Store - * with Spark's parallelization - */ -object PEventStore { - - @transient lazy private val eventsDb = Storage.getPEvents() - - /** Read events from Event Store - * - * @param appName return events of this app - * @param channelName return events of this channel (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param sc Spark context - * @return RDD[Event] - */ - def find( - appName: String, - channelName: Option[String] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None - )(sc: SparkContext): RDD[Event] = { - - val (appId, channelId) = Common.appNameToId(appName, channelName) - - eventsDb.find( - appId = appId, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - entityType = entityType, - entityId = entityId, - eventNames = eventNames, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId - )(sc) - - } - - /** Aggregate properties of entities based on these special events: - * \$set, \$unset, \$delete events. - * - * @param appName use events of this app - * @param entityType aggregate properties of the entities of this entityType - * @param channelName use events of this channel (default channel if it's None) - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param required only keep entities with these required properties defined - * @param sc Spark context - * @return RDD[(String, PropertyMap)] RDD of entityId and PropetyMap pair - */ - def aggregateProperties( - appName: String, - entityType: String, - channelName: Option[String] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - required: Option[Seq[String]] = None) - (sc: SparkContext): RDD[(String, PropertyMap)] = { - - val (appId, channelId) = Common.appNameToId(appName, channelName) - - eventsDb.aggregateProperties( - appId = appId, - entityType = entityType, - channelId = channelId, - startTime = startTime, - untilTime = untilTime, - required = required - )(sc) - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala b/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala deleted file mode 100644 index d619f65..0000000 --- a/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala +++ /dev/null @@ -1,142 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.store.java - -import io.prediction.data.storage.Event -import io.prediction.data.store.LEventStore -import org.joda.time.DateTime - -import scala.collection.JavaConversions -import scala.concurrent.duration.Duration - -/** This Java-friendly object provides a set of operation to access Event Store - * without going through Spark's parallelization - */ -object LJavaEventStore { - - /** Reads events of the specified entity. May use this in Algorithm's predict() - * or Serving logic to have fast event store access. - * - * @param appName return events of this app - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param channelName return events of this channel (default channel if it's None) - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param limit Limit number of events. Get all events if None or Some(-1) - * @param latest Return latest event first - * @return java.util.List[Event] - */ - def findByEntity( - appName: String, - entityType: String, - entityId: String, - channelName: Option[String], - eventNames: Option[java.util.List[String]], - targetEntityType: Option[Option[String]], - targetEntityId: Option[Option[String]], - startTime: Option[DateTime], - untilTime: Option[DateTime], - limit: Option[Integer], - latest: Boolean, - timeout: Duration): java.util.List[Event] = { - - val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) - val limitInt = limit.map(_.intValue()) - - JavaConversions.seqAsJavaList( - LEventStore.findByEntity( - appName, - entityType, - entityId, - channelName, - eventNamesSeq, - targetEntityType, - targetEntityId, - startTime, - untilTime, - limitInt, - latest, - timeout - ).toSeq) - } - - /** Reads events generically. If entityType or entityId is not specified, it - * results in table scan. - * - * @param appName return events of this app - * @param entityType return events of this entityType - * - None means no restriction on entityType - * - Some(x) means entityType should match x. - * @param entityId return events of this entityId - * - None means no restriction on entityId - * - Some(x) means entityId should match x. - * @param channelName return events of this channel (default channel if it's None) - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param limit Limit number of events. Get all events if None or Some(-1) - * @return java.util.List[Event] - */ - def find( - appName: String, - entityType: Option[String], - entityId: Option[String], - channelName: Option[String], - eventNames: Option[java.util.List[String]], - targetEntityType: Option[Option[String]], - targetEntityId: Option[Option[String]], - startTime: Option[DateTime], - untilTime: Option[DateTime], - limit: Option[Integer], - timeout: Duration): java.util.List[Event] = { - - val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) - val limitInt = limit.map(_.intValue()) - - JavaConversions.seqAsJavaList( - LEventStore.find( - appName, - entityType, - entityId, - channelName, - eventNamesSeq, - targetEntityType, - targetEntityId, - startTime, - untilTime, - limitInt, - timeout - ).toSeq) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala b/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala deleted file mode 100644 index dee608d..0000000 --- a/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.store.java - -/** Used by Java-based engines to mock Some and None */ -object OptionHelper { - /** Mimics a None from Java-based engine */ - def none[T]: Option[T] = { - Option(null.asInstanceOf[T]) - } - - /** Mimics a Some from Java-based engine */ - def some[T](value: T): Option[T] = { - Some(value) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala b/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala deleted file mode 100644 index c0657d2..0000000 --- a/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala +++ /dev/null @@ -1,109 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.store.java - -import io.prediction.data.storage.Event -import io.prediction.data.storage.PropertyMap -import io.prediction.data.store.PEventStore -import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaRDD -import org.joda.time.DateTime - -import scala.collection.JavaConversions - -/** This Java-friendly object provides a set of operation to access Event Store - * with Spark's parallelization - */ -object PJavaEventStore { - - /** Read events from Event Store - * - * @param appName return events of this app - * @param channelName return events of this channel (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param sc Spark context - * @return JavaRDD[Event] - */ - def find( - appName: String, - channelName: Option[String], - startTime: Option[DateTime], - untilTime: Option[DateTime], - entityType: Option[String], - entityId: Option[String], - eventNames: Option[java.util.List[String]], - targetEntityType: Option[Option[String]], - targetEntityId: Option[Option[String]], - sc: SparkContext): JavaRDD[Event] = { - - val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) - - PEventStore.find( - appName, - channelName, - startTime, - untilTime, - entityType, - entityId, - eventNamesSeq, - targetEntityType, - targetEntityId - )(sc) - } - - /** Aggregate properties of entities based on these special events: - * \$set, \$unset, \$delete events. - * - * @param appName use events of this app - * @param entityType aggregate properties of the entities of this entityType - * @param channelName use events of this channel (default channel if it's None) - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param required only keep entities with these required properties defined - * @param sc Spark context - * @return JavaRDD[(String, PropertyMap)] JavaRDD of entityId and PropetyMap pair - */ - def aggregateProperties( - appName: String, - entityType: String, - channelName: Option[String], - startTime: Option[DateTime], - untilTime: Option[DateTime], - required: Option[java.util.List[String]], - sc: SparkContext): JavaRDD[(String, PropertyMap)] = { - - PEventStore.aggregateProperties( - appName, - entityType, - channelName, - startTime, - untilTime - )(sc) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/store/package.scala b/data/src/main/scala/io/prediction/data/store/package.scala deleted file mode 100644 index 4856416..0000000 --- a/data/src/main/scala/io/prediction/data/store/package.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data - -/** Provides high level interfaces to the Event Store from within a prediction - * engine. - */ -package object store {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/DataView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/view/DataView.scala b/data/src/main/scala/io/prediction/data/view/DataView.scala deleted file mode 100644 index 52a67fd..0000000 --- a/data/src/main/scala/io/prediction/data/view/DataView.scala +++ /dev/null @@ -1,110 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.view - -import io.prediction.annotation.Experimental -import io.prediction.data.storage.Event - -import grizzled.slf4j.Logger -import io.prediction.data.store.PEventStore - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.SQLContext -import org.joda.time.DateTime - -import scala.reflect.ClassTag -import scala.reflect.runtime.universe._ -import scala.util.hashing.MurmurHash3 - -/** - * :: Experimental :: - */ -@Experimental -object DataView { - /** - * :: Experimental :: - * - * Create a DataFrame from events of a specified app. - * - * @param appName return events of this app - * @param channelName use events of this channel (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param conversionFunction a function that turns raw Events into events of interest. - * If conversionFunction returns None, such events are dropped. - * @param name identify the DataFrame created - * @param version used to track changes to the conversionFunction, e.g. version = "20150413" - * and update whenever the function is changed. - * @param sqlContext SQL context - * @tparam E the output type of the conversion function. The type needs to extend Product - * (e.g. case class) - * @return a DataFrame of events - */ - @Experimental - def create[E <: Product: TypeTag: ClassTag]( - appName: String, - channelName: Option[String] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - conversionFunction: Event => Option[E], - name: String = "", - version: String = "")(sqlContext: SQLContext): DataFrame = { - - @transient lazy val logger = Logger[this.type] - - val sc = sqlContext.sparkContext - - val beginTime = startTime match { - case Some(t) => t - case None => new DateTime(0L) - } - val endTime = untilTime match { - case Some(t) => t - case None => DateTime.now() // fix the current time - } - // detect changes to the case class - val uid = java.io.ObjectStreamClass.lookup(implicitly[reflect.ClassTag[E]].runtimeClass) - .getSerialVersionUID - val hash = MurmurHash3.stringHash(s"$beginTime-$endTime-$version-$uid") - val baseDir = s"${sys.env("PIO_FS_BASEDIR")}/view" - val fileName = s"$baseDir/$name-$appName-$hash.parquet" - try { - sqlContext.parquetFile(fileName) - } catch { - case e: java.io.FileNotFoundException => - logger.info("Cached copy not found, reading from DB.") - // if cached copy is found, use it. If not, grab from Storage - val result: RDD[E] = PEventStore.find( - appName = appName, - channelName = channelName, - startTime = startTime, - untilTime = Some(endTime))(sc) - .flatMap((e) => conversionFunction(e)) - import sqlContext.implicits._ // needed for RDD.toDF() - val resultDF = result.toDF() - - resultDF.saveAsParquetFile(fileName) - sqlContext.parquetFile(fileName) - case e: java.lang.RuntimeException => - if (e.toString.contains("is not a Parquet file")) { - logger.error(s"$fileName does not contain a valid Parquet file. " + - "Please delete it and try again.") - } - throw e - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/LBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/view/LBatchView.scala b/data/src/main/scala/io/prediction/data/view/LBatchView.scala deleted file mode 100644 index f806056..0000000 --- a/data/src/main/scala/io/prediction/data/view/LBatchView.scala +++ /dev/null @@ -1,200 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.view - -import io.prediction.data.storage.Event -import io.prediction.data.storage.EventValidation -import io.prediction.data.storage.DataMap -import io.prediction.data.storage.Storage - -import org.joda.time.DateTime -import scala.language.implicitConversions - -import scala.concurrent.ExecutionContext.Implicits.global // TODO - -@deprecated("Use LEvents or LEventStore instead.", "0.9.2") -object ViewPredicates { - def getStartTimePredicate(startTimeOpt: Option[DateTime]) - : (Event => Boolean) = { - startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true) - } - - def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = { - e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime))) - } - - def getUntilTimePredicate(untilTimeOpt: Option[DateTime]) - : (Event => Boolean) = { - untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true) - } - - def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = { - _.eventTime.isBefore(untilTime) - } - - def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean) - = { - entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true) - } - - def getEntityTypePredicate(entityType: String): (Event => Boolean) = { - (_.entityType == entityType) - } - - def getEventPredicate(eventOpt: Option[String]): (Event => Boolean) - = { - eventOpt.map(getEventPredicate).getOrElse(_ => true) - } - - def getEventPredicate(event: String): (Event => Boolean) = { - (_.event == event) - } -} - -@deprecated("Use LEvents instead.", "0.9.2") -object ViewAggregators { - def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = { - (p, e) => { - e.event match { - case "$set" => { - if (p == None) { - Some(e.properties) - } else { - p.map(_ ++ e.properties) - } - } - case "$unset" => { - if (p == None) { - None - } else { - p.map(_ -- e.properties.keySet) - } - } - case "$delete" => None - case _ => p // do nothing for others - } - } - } -} - -@deprecated("Use LEvents instead.", "0.9.2") -object EventSeq { - // Need to - // >>> import scala.language.implicitConversions - // to enable implicit conversion. Only import in the code where this is - // necessary to avoid confusion. - implicit def eventSeqToList(es: EventSeq): List[Event] = es.events - implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l) -} - - -@deprecated("Use LEvents instead.", "0.9.2") -class EventSeq(val events: List[Event]) { - def filter( - eventOpt: Option[String] = None, - entityTypeOpt: Option[String] = None, - startTimeOpt: Option[DateTime] = None, - untilTimeOpt: Option[DateTime] = None): EventSeq = { - - events - .filter(ViewPredicates.getEventPredicate(eventOpt)) - .filter(ViewPredicates.getStartTimePredicate(startTimeOpt)) - .filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt)) - .filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt)) - } - - def filter(p: (Event => Boolean)): EventSeq = events.filter(p) - - def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T) - : Map[String, T] = { - events - .groupBy( _.entityId ) - .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op)) - .toMap - } - - -} - - -@deprecated("Use LEventStore instead.", "0.9.2") -class LBatchView( - val appId: Int, - val startTime: Option[DateTime], - val untilTime: Option[DateTime]) { - - @transient lazy val eventsDb = Storage.getLEvents() - - @transient lazy val _events = eventsDb.find( - appId = appId, - startTime = startTime, - untilTime = untilTime).toList - - @transient lazy val events: EventSeq = new EventSeq(_events) - - /* Aggregate event data - * - * @param entityType only aggregate event with entityType - * @param startTimeOpt if specified, only aggregate event after (inclusive) - * startTimeOpt - * @param untilTimeOpt if specified, only aggregate event until (exclusive) - * endTimeOpt - */ - def aggregateProperties( - entityType: String, - startTimeOpt: Option[DateTime] = None, - untilTimeOpt: Option[DateTime] = None - ): Map[String, DataMap] = { - - events - .filter(entityTypeOpt = Some(entityType)) - .filter(e => EventValidation.isSpecialEvents(e.event)) - .aggregateByEntityOrdered( - init = None, - op = ViewAggregators.getDataMapAggregator()) - .filter{ case (k, v) => (v != None) } - .mapValues(_.get) - - } - - /* - def aggregateByEntityOrdered[T]( - predicate: Event => Boolean, - init: T, - op: (T, Event) => T): Map[String, T] = { - - _events - .filter( predicate(_) ) - .groupBy( _.entityId ) - .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op)) - .toMap - - } - */ - - /* - def groupByEntityOrdered[T]( - predicate: Event => Boolean, - map: Event => T): Map[String, Seq[T]] = { - - _events - .filter( predicate(_) ) - .groupBy( _.entityId ) - .mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) ) - .toMap - } - */ -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/PBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/view/PBatchView.scala b/data/src/main/scala/io/prediction/data/view/PBatchView.scala deleted file mode 100644 index 5b0f878..0000000 --- a/data/src/main/scala/io/prediction/data/view/PBatchView.scala +++ /dev/null @@ -1,209 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.view - -import io.prediction.data.storage.hbase.HBPEvents -import io.prediction.data.storage.Event -import io.prediction.data.storage.EventValidation -import io.prediction.data.storage.DataMap -import io.prediction.data.storage.Storage - -import org.joda.time.DateTime - -import org.json4s.JValue - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - - -// each JValue data associated with the time it is set -private[prediction] case class PropTime(val d: JValue, val t: Long) extends Serializable - -private[prediction] case class SetProp ( - val fields: Map[String, PropTime], - // last set time. Note: fields could be empty with valid set time - val t: Long) extends Serializable { - - def ++ (that: SetProp): SetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, PropTime] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData.t > thatData.t) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - // keep the latest set time - val combinedT = if (this.t > that.t) this.t else that.t - - SetProp( - fields = combinedFields, - t = combinedT - ) - } -} - -private[prediction] case class UnsetProp (fields: Map[String, Long]) extends Serializable { - def ++ (that: UnsetProp): UnsetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, Long] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData > thatData) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - UnsetProp( - fields = combinedFields - ) - } -} - -private[prediction] case class DeleteEntity (t: Long) extends Serializable { - def ++ (that: DeleteEntity): DeleteEntity = { - if (this.t > that.t) this else that - } -} - -private[prediction] case class EventOp ( - val setProp: Option[SetProp] = None, - val unsetProp: Option[UnsetProp] = None, - val deleteEntity: Option[DeleteEntity] = None -) extends Serializable { - - def ++ (that: EventOp): EventOp = { - EventOp( - setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), - unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), - deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _) - ) - } - - def toDataMap(): Option[DataMap] = { - setProp.flatMap { set => - - val unsetKeys: Set[String] = unsetProp.map( unset => - unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet - ).getOrElse(Set()) - - val combinedFields = deleteEntity.map { delete => - if (delete.t >= set.t) { - None - } else { - val deleteKeys: Set[String] = set.fields - .filter { case (k, PropTime(kv, t)) => - (delete.t >= t) - }.keySet - Some(set.fields -- unsetKeys -- deleteKeys) - } - }.getOrElse{ - Some(set.fields -- unsetKeys) - } - - // Note: mapValues() doesn't return concrete Map and causes - // NotSerializableException issue. Use map(identity) to work around this. - // see https://issues.scala-lang.org/browse/SI-7005 - combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity))) - } - } - -} - -private[prediction] object EventOp { - def apply(e: Event): EventOp = { - val t = e.eventTime.getMillis - e.event match { - case "$set" => { - val fields = e.properties.fields.mapValues(jv => - PropTime(jv, t) - ).map(identity) - - EventOp( - setProp = Some(SetProp(fields = fields, t = t)) - ) - } - case "$unset" => { - val fields = e.properties.fields.mapValues(jv => t).map(identity) - EventOp( - unsetProp = Some(UnsetProp(fields = fields)) - ) - } - case "$delete" => { - EventOp( - deleteEntity = Some(DeleteEntity(t)) - ) - } - case _ => { - EventOp() - } - } - } -} - -@deprecated("Use PEvents or PEventStore instead.", "0.9.2") -class PBatchView( - val appId: Int, - val startTime: Option[DateTime], - val untilTime: Option[DateTime], - val sc: SparkContext) { - - // NOTE: parallel Events DB interface - @transient lazy val eventsDb = Storage.getPEvents() - - @transient lazy val _events: RDD[Event] = - eventsDb.getByAppIdAndTimeAndEntity( - appId = appId, - startTime = startTime, - untilTime = untilTime, - entityType = None, - entityId = None)(sc) - - // TODO: change to use EventSeq? - @transient lazy val events: RDD[Event] = _events - - def aggregateProperties( - entityType: String, - startTimeOpt: Option[DateTime] = None, - untilTimeOpt: Option[DateTime] = None - ): RDD[(String, DataMap)] = { - - _events - .filter( e => ((e.entityType == entityType) && - (EventValidation.isSpecialEvents(e.event))) ) - .map( e => (e.entityId, EventOp(e) )) - .aggregateByKey[EventOp](EventOp())( - // within same partition - seqOp = { case (u, v) => u ++ v }, - // across partition - combOp = { case (accu, u) => accu ++ u } - ) - .mapValues(_.toDataMap) - .filter{ case (k, v) => v.isDefined } - .map{ case (k, v) => (k, v.get) } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/QuickTest.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/view/QuickTest.scala b/data/src/main/scala/io/prediction/data/view/QuickTest.scala deleted file mode 100644 index 68ade1d..0000000 --- a/data/src/main/scala/io/prediction/data/view/QuickTest.scala +++ /dev/null @@ -1,94 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.view - -import io.prediction.data.storage.Event -import io.prediction.data.storage.LEvents -import io.prediction.data.storage.EventValidation -import io.prediction.data.storage.DataMap -import io.prediction.data.storage.Storage - -import scala.concurrent.ExecutionContext.Implicits.global // TODO - -import grizzled.slf4j.Logger -import org.joda.time.DateTime - -import scala.language.implicitConversions - -class TestHBLEvents() { - @transient lazy val eventsDb = Storage.getLEvents() - - def run(): Unit = { - val r = eventsDb.find( - appId = 1, - startTime = None, - untilTime = None, - entityType = Some("pio_user"), - entityId = Some("3")).toList - println(r) - } -} - -class TestSource(val appId: Int) { - @transient lazy val logger = Logger[this.type] - @transient lazy val batchView = new LBatchView(appId, - None, None) - - def run(): Unit = { - println(batchView.events) - } -} - -object QuickTest { - - def main(args: Array[String]) { - val t = new TestHBLEvents() - t.run() - - // val ts = new TestSource(args(0).toInt) - // ts.run() - } -} - -object TestEventTime { - @transient lazy val batchView = new LBatchView(9, None, None) - - // implicit def back2list(es: EventSeq) = es.events - - def main(args: Array[String]) { - val e = batchView.events.filter( - eventOpt = Some("rate"), - startTimeOpt = Some(new DateTime(1998, 1, 1, 0, 0)) - // untilTimeOpt = Some(new DateTime(1997, 1, 1, 0, 0)) - ) - // untilTimeOpt = Some(new DateTime(2000, 1, 1, 0, 0))) - - e.foreach { println } - println() - println() - println() - val u = batchView.aggregateProperties("pio_item") - u.foreach { println } - println() - println() - println() - - // val l: Seq[Event] = e - val l = e.map { _.entityId } - l.foreach { println } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala b/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala deleted file mode 100644 index 0b64afb..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks - -/** Webhooks Connnector Exception - * - * @param message the detail message - * @param cause the cause - */ -private[prediction] class ConnectorException(message: String, cause: Throwable) - extends Exception(message, cause) { - - /** Webhooks Connnector Exception with cause being set to null - * - * @param message the detail message - */ - def this(message: String) = this(message, null) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala b/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala deleted file mode 100644 index 424b6ba..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala +++ /dev/null @@ -1,46 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks - -import io.prediction.data.storage.Event -import io.prediction.data.storage.EventJson4sSupport - -import org.json4s.Formats -import org.json4s.DefaultFormats -import org.json4s.JObject -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write - - -private[prediction] object ConnectorUtil { - - implicit val eventJson4sFormats: Formats = DefaultFormats + - new EventJson4sSupport.APISerializer - - // intentionally use EventJson4sSupport.APISerializer to convert - // from JSON to Event object. Don't allow connector directly create - // Event object so that the Event object formation is consistent - // by enforcing JSON format - - def toEvent(connector: JsonConnector, data: JObject): Event = { - read[Event](write(connector.toEventJson(data))) - } - - def toEvent(connector: FormConnector, data: Map[String, String]): Event = { - read[Event](write(connector.toEventJson(data))) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala deleted file mode 100644 index 9087f31..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala +++ /dev/null @@ -1,32 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks - -import org.json4s.JObject - -/** Connector for Webhooks connection with Form submission data format - */ -private[prediction] trait FormConnector { - - // TODO: support conversion to multiple events? - - /** Convert from original Form submission data to Event JObject - * @param data Map of key-value pairs in String type received through webhooks - * @return Event JObject - */ - def toEventJson(data: Map[String, String]): JObject - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala deleted file mode 100644 index e0e80fe..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks - -import org.json4s.JObject - -/** Connector for Webhooks connection */ -private[prediction] trait JsonConnector { - - // TODO: support conversion to multiple events? - - /** Convert from original JObject to Event JObject - * @param data original JObject recevived through webhooks - * @return Event JObject - */ - def toEventJson(data: JObject): JObject - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala deleted file mode 100644 index f19e009..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala +++ /dev/null @@ -1,123 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.exampleform - -import io.prediction.data.webhooks.FormConnector -import io.prediction.data.webhooks.ConnectorException - -import org.json4s.JObject - - -/** Example FormConnector with following types of webhook form data inputs: - * - * UserAction - * - * "type"="userAction" - * "userId"="as34smg4", - * "event"="do_something", - * "context[ip]"="24.5.68.47", // optional - * "context[prop1]"="2.345", // optional - * "context[prop2]"="value1" // optional - * "anotherProperty1"="100", - * "anotherProperty2"="optional1", // optional - * "timestamp"="2015-01-02T00:30:12.984Z" - * - * UserActionItem - * - * "type"="userActionItem" - * "userId"="as34smg4", - * "event"="do_something_on", - * "itemId"="kfjd312bc", - * "context[ip]"="1.23.4.56", - * "context[prop1]"="2.345", - * "context[prop2]"="value1", - * "anotherPropertyA"="4.567", // optional - * "anotherPropertyB"="false", // optional - * "timestamp"="2015-01-15T04:20:23.567Z" - * - */ -private[prediction] object ExampleFormConnector extends FormConnector { - - override - def toEventJson(data: Map[String, String]): JObject = { - val json = try { - data.get("type") match { - case Some("userAction") => userActionToEventJson(data) - case Some("userActionItem") => userActionItemToEventJson(data) - case Some(x) => throw new ConnectorException( - s"Cannot convert unknown type ${x} to event JSON") - case None => throw new ConnectorException( - s"The field 'type' is required.") - } - } catch { - case e: ConnectorException => throw e - case e: Exception => throw new ConnectorException( - s"Cannot convert ${data} to event JSON. ${e.getMessage()}", e) - } - json - } - - def userActionToEventJson(data: Map[String, String]): JObject = { - import org.json4s.JsonDSL._ - - // two level optional data - val context = if (data.exists(_._1.startsWith("context["))) { - Some( - ("ip" -> data.get("context[ip]")) ~ - ("prop1" -> data.get("context[prop1]").map(_.toDouble)) ~ - ("prop2" -> data.get("context[prop2]")) - ) - } else { - None - } - - val json = - ("event" -> data("event")) ~ - ("entityType" -> "user") ~ - ("entityId" -> data("userId")) ~ - ("eventTime" -> data("timestamp")) ~ - ("properties" -> ( - ("context" -> context) ~ - ("anotherProperty1" -> data("anotherProperty1").toInt) ~ - ("anotherProperty2" -> data.get("anotherProperty2")) - )) - json - } - - - def userActionItemToEventJson(data: Map[String, String]): JObject = { - import org.json4s.JsonDSL._ - - val json = - ("event" -> data("event")) ~ - ("entityType" -> "user") ~ - ("entityId" -> data("userId")) ~ - ("targetEntityType" -> "item") ~ - ("targetEntityId" -> data("itemId")) ~ - ("eventTime" -> data("timestamp")) ~ - ("properties" -> ( - ("context" -> ( - ("ip" -> data("context[ip]")) ~ - ("prop1" -> data("context[prop1]").toDouble) ~ - ("prop2" -> data("context[prop2]")) - )) ~ - ("anotherPropertyA" -> data.get("anotherPropertyA").map(_.toDouble)) ~ - ("anotherPropertyB" -> data.get("anotherPropertyB").map(_.toBoolean)) - )) - json - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala deleted file mode 100644 index 4d4b991..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala +++ /dev/null @@ -1,153 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.examplejson - -import io.prediction.data.webhooks.JsonConnector -import io.prediction.data.webhooks.ConnectorException - -import org.json4s.Formats -import org.json4s.DefaultFormats -import org.json4s.JObject - -/** Example JsonConnector with following types of webhooks JSON input: - * - * UserAction - * - * { - * "type": "userAction" - * "userId": "as34smg4", - * "event": "do_something", - * "context": { - * "ip": "24.5.68.47", - * "prop1": 2.345, - * "prop2": "value1" - * }, - * "anotherProperty1": 100, - * "anotherProperty2": "optional1", - * "timestamp": "2015-01-02T00:30:12.984Z" - * } - * - * UserActionItem - * - * { - * "type": "userActionItem" - * "userId": "as34smg4", - * "event": "do_something_on", - * "itemId": "kfjd312bc", - * "context": { - * "ip": "1.23.4.56", - * "prop1": 2.345, - * "prop2": "value1" - * }, - * "anotherPropertyA": 4.567, - * "anotherPropertyB": false, - * "timestamp": "2015-01-15T04:20:23.567Z" - * } - */ -private[prediction] object ExampleJsonConnector extends JsonConnector { - - implicit val json4sFormats: Formats = DefaultFormats - - override def toEventJson(data: JObject): JObject = { - val common = try { - data.extract[Common] - } catch { - case e: Exception => throw new ConnectorException( - s"Cannot extract Common field from ${data}. ${e.getMessage()}", e) - } - - val json = try { - common.`type` match { - case "userAction" => - toEventJson(common = common, userAction = data.extract[UserAction]) - case "userActionItem" => - toEventJson(common = common, userActionItem = data.extract[UserActionItem]) - case x: String => - throw new ConnectorException( - s"Cannot convert unknown type '${x}' to Event JSON.") - } - } catch { - case e: ConnectorException => throw e - case e: Exception => throw new ConnectorException( - s"Cannot convert ${data} to eventJson. ${e.getMessage()}", e) - } - - json - } - - def toEventJson(common: Common, userAction: UserAction): JObject = { - import org.json4s.JsonDSL._ - - // map to EventAPI JSON - val json = - ("event" -> userAction.event) ~ - ("entityType" -> "user") ~ - ("entityId" -> userAction.userId) ~ - ("eventTime" -> userAction.timestamp) ~ - ("properties" -> ( - ("context" -> userAction.context) ~ - ("anotherProperty1" -> userAction.anotherProperty1) ~ - ("anotherProperty2" -> userAction.anotherProperty2) - )) - json - } - - def toEventJson(common: Common, userActionItem: UserActionItem): JObject = { - import org.json4s.JsonDSL._ - - // map to EventAPI JSON - val json = - ("event" -> userActionItem.event) ~ - ("entityType" -> "user") ~ - ("entityId" -> userActionItem.userId) ~ - ("targetEntityType" -> "item") ~ - ("targetEntityId" -> userActionItem.itemId) ~ - ("eventTime" -> userActionItem.timestamp) ~ - ("properties" -> ( - ("context" -> userActionItem.context) ~ - ("anotherPropertyA" -> userActionItem.anotherPropertyA) ~ - ("anotherPropertyB" -> userActionItem.anotherPropertyB) - )) - json - } - - // Common required fields - case class Common( - `type`: String - ) - - // User Actions fields - case class UserAction ( - userId: String, - event: String, - context: Option[JObject], - anotherProperty1: Int, - anotherProperty2: Option[String], - timestamp: String - ) - - // UserActionItem fields - case class UserActionItem ( - userId: String, - event: String, - itemId: String, - context: JObject, - anotherPropertyA: Option[Double], - anotherPropertyB: Option[Boolean], - timestamp: String - ) - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala deleted file mode 100644 index b2793a0..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala +++ /dev/null @@ -1,305 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package io.prediction.data.webhooks.mailchimp - -import io.prediction.data.webhooks.FormConnector -import io.prediction.data.webhooks.ConnectorException -import io.prediction.data.storage.EventValidation -import io.prediction.data.Utils - -import org.json4s.JObject - -import org.joda.time.DateTime -import org.joda.time.format.DateTimeFormat - -private[prediction] object MailChimpConnector extends FormConnector { - - override - def toEventJson(data: Map[String, String]): JObject = { - - val json = data.get("type") match { - case Some("subscribe") => subscribeToEventJson(data) - // UNSUBSCRIBE - case Some("unsubscribe") => unsubscribeToEventJson(data) - // PROFILE UPDATES - case Some("profile") => profileToEventJson(data) - // EMAIL UPDATE - case Some("upemail") => upemailToEventJson(data) - // CLEANED EMAILS - case Some("cleaned") => cleanedToEventJson(data) - // CAMPAIGN SENDING STATUS - case Some("campaign") => campaignToEventJson(data) - // invalid type - case Some(x) => throw new ConnectorException( - s"Cannot convert unknown MailChimp data type ${x} to event JSON") - case None => throw new ConnectorException( - s"The field 'type' is required for MailChimp data.") - } - json - } - - - val mailChimpDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") - .withZone(EventValidation.defaultTimeZone) - - def parseMailChimpDateTime(s: String): DateTime = { - mailChimpDateTimeFormat.parseDateTime(s) - } - - def subscribeToEventJson(data: Map[String, String]): JObject = { - - import org.json4s.JsonDSL._ - - /* - "type": "subscribe", - "fired_at": "2009-03-26 21:35:57", - "data[id]": "8a25ff1d98", - "data[list_id]": "a6b5da1054", - "data[email]": "[email protected]", - "data[email_type]": "html", - "data[merges][EMAIL]": "[email protected]", - "data[merges][FNAME]": "MailChimp", - "data[merges][LNAME]": "API", - "data[merges][INTERESTS]": "Group1,Group2", - "data[ip_opt]": "10.20.10.30", - "data[ip_signup]": "10.20.10.30" - */ - - // convert to ISO8601 format - val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) - - // TODO: handle optional fields - val json = - ("event" -> "subscribe") ~ - ("entityType" -> "user") ~ - ("entityId" -> data("data[id]")) ~ - ("targetEntityType" -> "list") ~ - ("targetEntityId" -> data("data[list_id]")) ~ - ("eventTime" -> eventTime) ~ - ("properties" -> ( - ("email" -> data("data[email]")) ~ - ("email_type" -> data("data[email_type]")) ~ - ("merges" -> ( - ("EMAIL" -> data("data[merges][EMAIL]")) ~ - ("FNAME" -> data("data[merges][FNAME]"))) ~ - ("LNAME" -> data("data[merges][LNAME]")) ~ - ("INTERESTS" -> data.get("data[merges][INTERESTS]")) - )) ~ - ("ip_opt" -> data("data[ip_opt]")) ~ - ("ip_signup" -> data("data[ip_signup]") - )) - - json - - } - - def unsubscribeToEventJson(data: Map[String, String]): JObject = { - - import org.json4s.JsonDSL._ - - /* - "action" will either be "unsub" or "delete". - The reason will be "manual" unless caused by a spam complaint - then it will be "abuse" - - "type": "unsubscribe", - "fired_at": "2009-03-26 21:40:57", - "data[action]": "unsub", - "data[reason]": "manual", - "data[id]": "8a25ff1d98", - "data[list_id]": "a6b5da1054", - "data[email]": "[email protected]", - "data[email_type]": "html", - "data[merges][EMAIL]": "[email protected]", - "data[merges][FNAME]": "MailChimp", - "data[merges][LNAME]": "API", - "data[merges][INTERESTS]": "Group1,Group2", - "data[ip_opt]": "10.20.10.30", - "data[campaign_id]": "cb398d21d2", - */ - - // convert to ISO8601 format - val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) - - val json = - ("event" -> "unsubscribe") ~ - ("entityType" -> "user") ~ - ("entityId" -> data("data[id]")) ~ - ("targetEntityType" -> "list") ~ - ("targetEntityId" -> data("data[list_id]")) ~ - ("eventTime" -> eventTime) ~ - ("properties" -> ( - ("action" -> data("data[action]")) ~ - ("reason" -> data("data[reason]")) ~ - ("email" -> data("data[email]")) ~ - ("email_type" -> data("data[email_type]")) ~ - ("merges" -> ( - ("EMAIL" -> data("data[merges][EMAIL]")) ~ - ("FNAME" -> data("data[merges][FNAME]"))) ~ - ("LNAME" -> data("data[merges][LNAME]")) ~ - ("INTERESTS" -> data.get("data[merges][INTERESTS]")) - )) ~ - ("ip_opt" -> data("data[ip_opt]")) ~ - ("campaign_id" -> data("data[campaign_id]") - )) - - json - - } - - def profileToEventJson(data: Map[String, String]): JObject = { - - import org.json4s.JsonDSL._ - - /* - "type": "profile", - "fired_at": "2009-03-26 21:31:21", - "data[id]": "8a25ff1d98", - "data[list_id]": "a6b5da1054", - "data[email]": "[email protected]", - "data[email_type]": "html", - "data[merges][EMAIL]": "[email protected]", - "data[merges][FNAME]": "MailChimp", - "data[merges][LNAME]": "API", - "data[merges][INTERESTS]": "Group1,Group2", \\OPTIONAL - "data[ip_opt]": "10.20.10.30" - */ - - // convert to ISO8601 format - val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) - - val json = - ("event" -> "profile") ~ - ("entityType" -> "user") ~ - ("entityId" -> data("data[id]")) ~ - ("targetEntityType" -> "list") ~ - ("targetEntityId" -> data("data[list_id]")) ~ - ("eventTime" -> eventTime) ~ - ("properties" -> ( - ("email" -> data("data[email]")) ~ - ("email_type" -> data("data[email_type]")) ~ - ("merges" -> ( - ("EMAIL" -> data("data[merges][EMAIL]")) ~ - ("FNAME" -> data("data[merges][FNAME]"))) ~ - ("LNAME" -> data("data[merges][LNAME]")) ~ - ("INTERESTS" -> data.get("data[merges][INTERESTS]")) - )) ~ - ("ip_opt" -> data("data[ip_opt]") - )) - - json - - } - - def upemailToEventJson(data: Map[String, String]): JObject = { - - import org.json4s.JsonDSL._ - - /* - "type": "upemail", - "fired_at": "2009-03-26 22:15:09", - "data[list_id]": "a6b5da1054", - "data[new_id]": "51da8c3259", - "data[new_email]": "[email protected]", - "data[old_email]": "[email protected]" - */ - - // convert to ISO8601 format - val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) - - val json = - ("event" -> "upemail") ~ - ("entityType" -> "user") ~ - ("entityId" -> data("data[new_id]")) ~ - ("targetEntityType" -> "list") ~ - ("targetEntityId" -> data("data[list_id]")) ~ - ("eventTime" -> eventTime) ~ - ("properties" -> ( - ("new_email" -> data("data[new_email]")) ~ - ("old_email" -> data("data[old_email]")) - )) - - json - - } - - def cleanedToEventJson(data: Map[String, String]): JObject = { - - import org.json4s.JsonDSL._ - - /* - Reason will be one of "hard" (for hard bounces) or "abuse" - "type": "cleaned", - "fired_at": "2009-03-26 22:01:00", - "data[list_id]": "a6b5da1054", - "data[campaign_id]": "4fjk2ma9xd", - "data[reason]": "hard", - "data[email]": "[email protected]" - */ - - // convert to ISO8601 format - val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) - - val json = - ("event" -> "cleaned") ~ - ("entityType" -> "list") ~ - ("entityId" -> data("data[list_id]")) ~ - ("eventTime" -> eventTime) ~ - ("properties" -> ( - ("campaignId" -> data("data[campaign_id]")) ~ - ("reason" -> data("data[reason]")) ~ - ("email" -> data("data[email]")) - )) - - json - - } - - def campaignToEventJson(data: Map[String, String]): JObject = { - - import org.json4s.JsonDSL._ - - /* - "type": "campaign", - "fired_at": "2009-03-26 21:31:21", - "data[id]": "5aa2102003", - "data[subject]": "Test Campaign Subject", - "data[status]": "sent", - "data[reason]": "", - "data[list_id]": "a6b5da1054" - */ - - // convert to ISO8601 format - val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at"))) - - val json = - ("event" -> "campaign") ~ - ("entityType" -> "campaign") ~ - ("entityId" -> data("data[id]")) ~ - ("targetEntityType" -> "list") ~ - ("targetEntityId" -> data("data[list_id]")) ~ - ("eventTime" -> eventTime) ~ - ("properties" -> ( - ("subject" -> data("data[subject]")) ~ - ("status" -> data("data[status]")) ~ - ("reason" -> data("data[reason]")) - )) - - json - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala deleted file mode 100644 index 318043c..0000000 --- a/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala +++ /dev/null @@ -1,306 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.segmentio - -import io.prediction.data.webhooks.{ConnectorException, JsonConnector} -import org.json4s._ - -private[prediction] object SegmentIOConnector extends JsonConnector { - - // private lazy val supportedAPI = Vector("2", "2.0", "2.0.0") - - implicit val json4sFormats: Formats = DefaultFormats - - override - def toEventJson(data: JObject): JObject = { - try { - val version: String = data.values("version").toString -/* - if (!supportedAPI.contains(version)) { - throw new ConnectorException( - s"Supported segment.io API versions: [2]. got [$version]" - ) - } -*/ - } catch { case _: Throwable â - throw new ConnectorException(s"Failed to get segment.io API version.") - } - - val common = try { - data.extract[Common] - } catch { - case e: Throwable â throw new ConnectorException( - s"Cannot extract Common field from $data. ${e.getMessage}", e - ) - } - - try { - common.`type` match { - case "identify" â - toEventJson( - common = common, - identify = data.extract[Events.Identify] - ) - - case "track" â - toEventJson( - common = common, - track = data.extract[Events.Track] - ) - - case "alias" â - toEventJson( - common = common, - alias = data.extract[Events.Alias] - ) - - case "page" â - toEventJson( - common = common, - page = data.extract[Events.Page] - ) - - case "screen" â - toEventJson( - common = common, - screen = data.extract[Events.Screen] - ) - - case "group" â - toEventJson( - common = common, - group = data.extract[Events.Group] - ) - - case _ â - throw new ConnectorException( - s"Cannot convert unknown type ${common.`type`} to event JSON." - ) - } - } catch { - case e: ConnectorException => throw e - case e: Exception => - throw new ConnectorException( - s"Cannot convert $data to event JSON. ${e.getMessage}", e - ) - } - } - - def toEventJson(common: Common, identify: Events.Identify ): JObject = { - import org.json4s.JsonDSL._ - val eventProperties = "traits" â identify.traits - toJson(common, eventProperties) - } - - def toEventJson(common: Common, track: Events.Track): JObject = { - import org.json4s.JsonDSL._ - val eventProperties = - ("properties" â track.properties) ~ - ("event" â track.event) - toJson(common, eventProperties) - } - - def toEventJson(common: Common, alias: Events.Alias): JObject = { - import org.json4s.JsonDSL._ - toJson(common, "previous_id" â alias.previous_id) - } - - def toEventJson(common: Common, screen: Events.Screen): JObject = { - import org.json4s.JsonDSL._ - val eventProperties = - ("name" â screen.name) ~ - ("properties" â screen.properties) - toJson(common, eventProperties) - } - - def toEventJson(common: Common, page: Events.Page): JObject = { - import org.json4s.JsonDSL._ - val eventProperties = - ("name" â page.name) ~ - ("properties" â page.properties) - toJson(common, eventProperties) - } - - def toEventJson(common: Common, group: Events.Group): JObject = { - import org.json4s.JsonDSL._ - val eventProperties = - ("group_id" â group.group_id) ~ - ("traits" â group.traits) - toJson(common, eventProperties) - } - - private def toJson(common: Common, props: JObject): JsonAST.JObject = { - val commonFields = commonToJson(common) - JObject(("properties" â properties(common, props)) :: commonFields.obj) - } - - private def properties(common: Common, eventProps: JObject): JObject = { - import org.json4s.JsonDSL._ - common.context map { context â - try { - ("context" â Extraction.decompose(context)) ~ eventProps - } catch { - case e: Throwable â - throw new ConnectorException( - s"Cannot convert $context to event JSON. ${e.getMessage }", e - ) - } - } getOrElse eventProps - } - - private def commonToJson(common: Common): JObject = - commonToJson(common, common.`type`) - - private def commonToJson(common: Common, typ: String): JObject = { - import org.json4s.JsonDSL._ - common.user_id.orElse(common.anonymous_id) match { - case Some(userId) â - ("event" â typ) ~ - ("entityType" â "user") ~ - ("entityId" â userId) ~ - ("eventTime" â common.timestamp) - - case None â - throw new ConnectorException( - "there was no `userId` or `anonymousId` in the common fields." - ) - } - } -} - -object Events { - - private[prediction] case class Track( - event: String, - properties: Option[JObject] = None - ) - - private[prediction] case class Alias(previous_id: String, user_id: String) - - private[prediction] case class Group( - group_id: String, - traits: Option[JObject] = None - ) - - private[prediction] case class Screen( - name: Option[String] = None, - properties: Option[JObject] = None - ) - - private[prediction] case class Page( - name: Option[String] = None, - properties: Option[JObject] = None - ) - - private[prediction] case class Identify( - user_id: String, - traits: Option[JObject] - ) - -} - -object Common { - - private[prediction] case class Integrations( - All: Boolean = false, - Mixpanel: Boolean = false, - Marketo: Boolean = false, - Salesforse: Boolean = false - ) - - private[prediction] case class Context( - ip: String, - library: Library, - user_agent: String, - app: Option[App] = None, - campaign: Option[Campaign] = None, - device: Option[Device] = None, - network: Option[Network] = None, - location: Option[Location] = None, - os: Option[OS] = None, - referrer: Option[Referrer] = None, - screen: Option[Screen] = None, - timezone: Option[String] = None - ) - - private[prediction] case class Screen(width: Int, height: Int, density: Int) - - private[prediction] case class Referrer(id: String, `type`: String) - - private[prediction] case class OS(name: String, version: String) - - private[prediction] case class Location( - city: Option[String] = None, - country: Option[String] = None, - latitude: Option[Double] = None, - longitude: Option[Double] = None, - speed: Option[Int] = None - ) - - case class Page( - path: String, - referrer: String, - search: String, - title: String, - url: String - ) - - private[prediction] case class Network( - bluetooth: Option[Boolean] = None, - carrier: Option[String] = None, - cellular: Option[Boolean] = None, - wifi: Option[Boolean] = None - ) - - private[prediction] case class Library(name: String, version: String) - - private[prediction] case class Device( - id: Option[String] = None, - advertising_id: Option[String] = None, - ad_tracking_enabled: Option[Boolean] = None, - manufacturer: Option[String] = None, - model: Option[String] = None, - name: Option[String] = None, - `type`: Option[String] = None, - token: Option[String] = None - ) - - private[prediction] case class Campaign( - name: Option[String] = None, - source: Option[String] = None, - medium: Option[String] = None, - term: Option[String] = None, - content: Option[String] = None - ) - - private[prediction] case class App( - name: Option[String] = None, - version: Option[String] = None, - build: Option[String] = None - ) - -} - -private[prediction] case class Common( - `type`: String, - sent_at: String, - timestamp: String, - version: String, - anonymous_id: Option[String] = None, - user_id: Option[String] = None, - context: Option[Common.Context] = None, - integrations: Option[Common.Integrations] = None -) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/Utils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/Utils.scala b/data/src/main/scala/org/apache/predictionio/data/Utils.scala new file mode 100644 index 0000000..db8c7a2 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/Utils.scala @@ -0,0 +1,50 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data + +import org.joda.time.DateTime +import org.joda.time.format.ISODateTimeFormat + +import java.lang.IllegalArgumentException + +private[prediction] object Utils { + + // use dateTime() for strict ISO8601 format + val dateTimeFormatter = ISODateTimeFormat.dateTime().withOffsetParsed() + + val dateTimeNoMillisFormatter = + ISODateTimeFormat.dateTimeNoMillis().withOffsetParsed() + + def stringToDateTime(dt: String): DateTime = { + // We accept two formats. + // 1. "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" + // 2. "yyyy-MM-dd'T'HH:mm:ssZZ" + // The first one also takes milliseconds into account. + try { + // formatting for "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" + dateTimeFormatter.parseDateTime(dt) + } catch { + case e: IllegalArgumentException => { + // handle when the datetime string doesn't specify milliseconds. + dateTimeNoMillisFormatter.parseDateTime(dt) + } + } + } + + def dateTimeToString(dt: DateTime): String = dateTimeFormatter.print(dt) + // dt.toString + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Common.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala new file mode 100644 index 0000000..c380daa --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala @@ -0,0 +1,80 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.webhooks.ConnectorException +import org.apache.predictionio.data.storage.StorageException + +import spray.routing._ +import spray.routing.Directives._ +import spray.routing.Rejection +import spray.http.StatusCodes +import spray.http.StatusCode +import spray.httpx.Json4sSupport + +import org.json4s.Formats +import org.json4s.DefaultFormats + +object Common { + + object Json4sProtocol extends Json4sSupport { + implicit def json4sFormats: Formats = DefaultFormats + } + + import Json4sProtocol._ + + val rejectionHandler = RejectionHandler { + case MalformedRequestContentRejection(msg, _) :: _ => + complete(StatusCodes.BadRequest, Map("message" -> msg)) + case MissingQueryParamRejection(msg) :: _ => + complete(StatusCodes.NotFound, + Map("message" -> s"missing required query parameter ${msg}.")) + case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => { + val msg = cause match { + case AuthenticationFailedRejection.CredentialsRejected => + "Invalid accessKey." + case AuthenticationFailedRejection.CredentialsMissing => + "Missing accessKey." + } + complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg)) + } + case ChannelRejection(msg) :: _ => + complete(StatusCodes.Unauthorized, Map("message" -> msg)) + case NonExistentAppRejection(msg) :: _ => + complete(StatusCodes.Unauthorized, Map("message" -> msg)) + } + + val exceptionHandler = ExceptionHandler { + case e: ConnectorException => { + val msg = s"${e.getMessage()}" + complete(StatusCodes.BadRequest, Map("message" -> msg)) + } + case e: StorageException => { + val msg = s"${e.getMessage()}" + complete(StatusCodes.InternalServerError, Map("message" -> msg)) + } + case e: Exception => { + val msg = s"${e.getMessage()}" + complete(StatusCodes.InternalServerError, Map("message" -> msg)) + } + } +} + +/** invalid channel */ +case class ChannelRejection(msg: String) extends Rejection + +/** the app doesn't exist */ +case class NonExistentAppRejection(msg: String) extends Rejection http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala new file mode 100644 index 0000000..e25234f --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala @@ -0,0 +1,24 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.storage.Event + +case class EventInfo( + appId: Int, + channelId: Option[Int], + event: Event) +
