Put back to the right place
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/f9283330 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/f9283330 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/f9283330 Branch: refs/heads/feature/xbuild Commit: f92833304a7bfc467a9143c9274ac4ee8022462c Parents: ab11980 Author: Donald Szeto <[email protected]> Authored: Thu Mar 16 00:15:39 2017 -0700 Committer: Donald Szeto <[email protected]> Committed: Thu Mar 16 00:15:39 2017 -0700 ---------------------------------------------------------------------- .../predictionio/data/view/PBatchView.scala | 204 ++++++++++++++++++ .../predictionio/data/view/PBatchView.scala | 212 ------------------- 2 files changed, 204 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/f9283330/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala new file mode 100644 index 0000000..cbdebce --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.view + +import org.apache.predictionio.data.storage.{DataMap, Event, EventValidation, Storage} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.joda.time.DateTime +import org.json4s.JValue + + +// each JValue data associated with the time it is set +private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable + +private[predictionio] case class SetProp ( + val fields: Map[String, PropTime], + // last set time. Note: fields could be empty with valid set time + val t: Long) extends Serializable { + + def ++ (that: SetProp): SetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, PropTime] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData.t > thatData.t) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + // keep the latest set time + val combinedT = if (this.t > that.t) this.t else that.t + + SetProp( + fields = combinedFields, + t = combinedT + ) + } +} + +private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable { + def ++ (that: UnsetProp): UnsetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, Long] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData > thatData) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + UnsetProp( + fields = combinedFields + ) + } +} + +private[predictionio] case class DeleteEntity (t: Long) extends Serializable { + def ++ (that: DeleteEntity): DeleteEntity = { + if (this.t > that.t) this else that + } +} + +private[predictionio] case class EventOp ( + val setProp: Option[SetProp] = None, + val unsetProp: Option[UnsetProp] = None, + val deleteEntity: Option[DeleteEntity] = None +) extends Serializable { + + def ++ (that: EventOp): EventOp = { + EventOp( + setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), + unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), + deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _) + ) + } + + def toDataMap(): Option[DataMap] = { + setProp.flatMap { set => + + val unsetKeys: Set[String] = unsetProp.map( unset => + unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet + ).getOrElse(Set()) + + val combinedFields = deleteEntity.map { delete => + if (delete.t >= set.t) { + None + } else { + val deleteKeys: Set[String] = set.fields + .filter { case (k, PropTime(kv, t)) => + (delete.t >= t) + }.keySet + Some(set.fields -- unsetKeys -- deleteKeys) + } + }.getOrElse{ + Some(set.fields -- unsetKeys) + } + + // Note: mapValues() doesn't return concrete Map and causes + // NotSerializableException issue. Use map(identity) to work around this. + // see https://issues.scala-lang.org/browse/SI-7005 + combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity))) + } + } + +} + +private[predictionio] object EventOp { + def apply(e: Event): EventOp = { + val t = e.eventTime.getMillis + e.event match { + case "$set" => { + val fields = e.properties.fields.mapValues(jv => + PropTime(jv, t) + ).map(identity) + + EventOp( + setProp = Some(SetProp(fields = fields, t = t)) + ) + } + case "$unset" => { + val fields = e.properties.fields.mapValues(jv => t).map(identity) + EventOp( + unsetProp = Some(UnsetProp(fields = fields)) + ) + } + case "$delete" => { + EventOp( + deleteEntity = Some(DeleteEntity(t)) + ) + } + case _ => { + EventOp() + } + } + } +} + +@deprecated("Use PEvents or PEventStore instead.", "0.9.2") +class PBatchView( + val appId: Int, + val startTime: Option[DateTime], + val untilTime: Option[DateTime], + val sc: SparkContext) { + + // NOTE: parallel Events DB interface + @transient lazy val eventsDb = Storage.getPEvents() + + @transient lazy val _events: RDD[Event] = + eventsDb.getByAppIdAndTimeAndEntity( + appId = appId, + startTime = startTime, + untilTime = untilTime, + entityType = None, + entityId = None)(sc) + + // TODO: change to use EventSeq? + @transient lazy val events: RDD[Event] = _events + + def aggregateProperties( + entityType: String, + startTimeOpt: Option[DateTime] = None, + untilTimeOpt: Option[DateTime] = None + ): RDD[(String, DataMap)] = { + + _events + .filter( e => ((e.entityType == entityType) && + (EventValidation.isSpecialEvents(e.event))) ) + .map( e => (e.entityId, EventOp(e) )) + .aggregateByKey[EventOp](EventOp())( + // within same partition + seqOp = { case (u, v) => u ++ v }, + // across partition + combOp = { case (accu, u) => accu ++ u } + ) + .mapValues(_.toDataMap) + .filter{ case (k, v) => v.isDefined } + .map{ case (k, v) => (k, v.get) } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/f9283330/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala deleted file mode 100644 index b453820..0000000 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.view - -import org.apache.predictionio.data.storage.hbase.HBPEvents -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation -import org.apache.predictionio.data.storage.DataMap -import org.apache.predictionio.data.storage.Storage - -import org.joda.time.DateTime - -import org.json4s.JValue - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - - -// each JValue data associated with the time it is set -private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable - -private[predictionio] case class SetProp ( - val fields: Map[String, PropTime], - // last set time. Note: fields could be empty with valid set time - val t: Long) extends Serializable { - - def ++ (that: SetProp): SetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, PropTime] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData.t > thatData.t) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - // keep the latest set time - val combinedT = if (this.t > that.t) this.t else that.t - - SetProp( - fields = combinedFields, - t = combinedT - ) - } -} - -private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable { - def ++ (that: UnsetProp): UnsetProp = { - val commonKeys = fields.keySet.intersect(that.fields.keySet) - - val common: Map[String, Long] = commonKeys.map { k => - val thisData = this.fields(k) - val thatData = that.fields(k) - // only keep the value with latest time - val v = if (thisData > thatData) thisData else thatData - (k, v) - }.toMap - - val combinedFields = common ++ - (this.fields -- commonKeys) ++ (that.fields -- commonKeys) - - UnsetProp( - fields = combinedFields - ) - } -} - -private[predictionio] case class DeleteEntity (t: Long) extends Serializable { - def ++ (that: DeleteEntity): DeleteEntity = { - if (this.t > that.t) this else that - } -} - -private[predictionio] case class EventOp ( - val setProp: Option[SetProp] = None, - val unsetProp: Option[UnsetProp] = None, - val deleteEntity: Option[DeleteEntity] = None -) extends Serializable { - - def ++ (that: EventOp): EventOp = { - EventOp( - setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), - unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), - deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _) - ) - } - - def toDataMap(): Option[DataMap] = { - setProp.flatMap { set => - - val unsetKeys: Set[String] = unsetProp.map( unset => - unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet - ).getOrElse(Set()) - - val combinedFields = deleteEntity.map { delete => - if (delete.t >= set.t) { - None - } else { - val deleteKeys: Set[String] = set.fields - .filter { case (k, PropTime(kv, t)) => - (delete.t >= t) - }.keySet - Some(set.fields -- unsetKeys -- deleteKeys) - } - }.getOrElse{ - Some(set.fields -- unsetKeys) - } - - // Note: mapValues() doesn't return concrete Map and causes - // NotSerializableException issue. Use map(identity) to work around this. - // see https://issues.scala-lang.org/browse/SI-7005 - combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity))) - } - } - -} - -private[predictionio] object EventOp { - def apply(e: Event): EventOp = { - val t = e.eventTime.getMillis - e.event match { - case "$set" => { - val fields = e.properties.fields.mapValues(jv => - PropTime(jv, t) - ).map(identity) - - EventOp( - setProp = Some(SetProp(fields = fields, t = t)) - ) - } - case "$unset" => { - val fields = e.properties.fields.mapValues(jv => t).map(identity) - EventOp( - unsetProp = Some(UnsetProp(fields = fields)) - ) - } - case "$delete" => { - EventOp( - deleteEntity = Some(DeleteEntity(t)) - ) - } - case _ => { - EventOp() - } - } - } -} - -@deprecated("Use PEvents or PEventStore instead.", "0.9.2") -class PBatchView( - val appId: Int, - val startTime: Option[DateTime], - val untilTime: Option[DateTime], - val sc: SparkContext) { - - // NOTE: parallel Events DB interface - @transient lazy val eventsDb = Storage.getPEvents() - - @transient lazy val _events: RDD[Event] = - eventsDb.getByAppIdAndTimeAndEntity( - appId = appId, - startTime = startTime, - untilTime = untilTime, - entityType = None, - entityId = None)(sc) - - // TODO: change to use EventSeq? - @transient lazy val events: RDD[Event] = _events - - def aggregateProperties( - entityType: String, - startTimeOpt: Option[DateTime] = None, - untilTimeOpt: Option[DateTime] = None - ): RDD[(String, DataMap)] = { - - _events - .filter( e => ((e.entityType == entityType) && - (EventValidation.isSpecialEvents(e.event))) ) - .map( e => (e.entityId, EventOp(e) )) - .aggregateByKey[EventOp](EventOp())( - // within same partition - seqOp = { case (u, v) => u ++ v }, - // across partition - combOp = { case (accu, u) => accu ++ u } - ) - .mapValues(_.toDataMap) - .filter{ case (k, v) => v.isDefined } - .map{ case (k, v) => (k, v.get) } - } - -}
