Repository: incubator-predictionio Updated Branches: refs/heads/develop bb964740c -> bab594a0b
[PIO-45] Fix compression for arrays in properties & ensure deduped events have latest event time Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bab594a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bab594a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bab594a0 Branch: refs/heads/develop Commit: bab594a0bd03a66dda32ed7480a23b4878f41345 Parents: bb96474 Author: EmergentOrder <[email protected]> Authored: Sun Feb 26 15:38:15 2017 -0600 Committer: EmergentOrder <[email protected]> Committed: Sun Feb 26 15:38:15 2017 -0600 ---------------------------------------------------------------------- .../core/SelfCleaningDataSource.scala | 26 ++++++-------------- .../core/SelfCleaningDataSourceTest.scala | 3 +-- 2 files changed, 8 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bab594a0/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala index 51af64d..acbd4a9 100644 --- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala +++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala @@ -127,7 +127,7 @@ trait SelfCleaningDataSource { def removePDuplicates(sc: SparkContext, rdd: RDD[Event]): RDD[Event] = { val now = DateTime.now() - rdd.map(x => + rdd.sortBy(_.eventTime, true).map(x => (recreateEvent(x, None, now), (x.eventId, x.eventTime))) .groupByKey .map{case (x, y) => recreateEvent(x, y.head._1, y.head._2)} @@ -144,7 +144,7 @@ trait SelfCleaningDataSource { def removeLDuplicates(ls: Iterable[Event]): Iterable[Event] = { val now = DateTime.now() - ls.toList.map(x => + ls.toList.reverse.map(x => (recreateEvent(x, None, now), (x.eventId, x.eventTime))) .groupBy(_._1).mapValues( _.map( _._2 ) ) .map(x => recreateEvent(x._1, x._2.head._1, x._2.head._2)) @@ -229,7 +229,7 @@ trait SelfCleaningDataSource { */ @DeveloperApi def cleanPEvents(sc: SparkContext): RDD[Event] = { - val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime, false) + val pEvents = getCleanedPEvents(PEventStore.find(appName)(sc).sortBy(_.eventTime, false)) val rdd = eventWindow match { case Some(ew) => @@ -241,7 +241,7 @@ trait SelfCleaningDataSource { case None => pEvents } - getCleanedPEvents(rdd) + rdd } /** :: DeveloperApi :: @@ -274,7 +274,7 @@ trait SelfCleaningDataSource { */ @DeveloperApi def cleanLEvents(): Iterable[Event] = { - val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime).reverse + val lEvents = getCleanedLEvents(LEventStore.find(appName).toList.sortBy(_.eventTime).reverse) val events = eventWindow match { case Some(ew) => @@ -285,7 +285,7 @@ trait SelfCleaningDataSource { case None => lEvents } - getCleanedLEvents(events) + events } @@ -300,7 +300,7 @@ trait SelfCleaningDataSource { events.reduce { (e1, e2) => val props = e2.event match { case "$set" => - e1.properties.fields ++ e2.properties.fields.map(concatJArrays(_,e1)) + e1.properties.fields ++ e2.properties.fields case "$unset" => e1.properties.fields .filterKeys(f => !e2.properties.fields.contains(f)) @@ -317,18 +317,6 @@ trait SelfCleaningDataSource { } } } - - private def concatJArrays(propAndEvent: (String, JValue), e1: Event): (String,JValue) = - propAndEvent match { - case (k,v) => k -> (v match { - case jArr: JArray => - (JArray((jArr.arr ++ (e1.properties.fields.getOrElse(k,JNothing) match { - case jArr2: JArray => jArr2.arr - case _ => List() - })).distinct)) - case _ => v - }) - } } case class EventWindow( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bab594a0/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala index 484acc4..55af708 100644 --- a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala +++ b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala @@ -90,8 +90,7 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside with SharedSparkCo JArray( List(JString("Tablets"), JString("Electronics"), - JString("Google"), - JString("Google2"))).values) + JString("Google"))).values) distinctEventsAfterCount should equal (eventsAfterCount) eventsBeforeCount should be > (eventsAfterCount)
