Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop bb964740c -> bab594a0b


[PIO-45] Fix compression for arrays in properties & ensure deduped events have 
latest event time


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bab594a0
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bab594a0
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bab594a0

Branch: refs/heads/develop
Commit: bab594a0bd03a66dda32ed7480a23b4878f41345
Parents: bb96474
Author: EmergentOrder <[email protected]>
Authored: Sun Feb 26 15:38:15 2017 -0600
Committer: EmergentOrder <[email protected]>
Committed: Sun Feb 26 15:38:15 2017 -0600

----------------------------------------------------------------------
 .../core/SelfCleaningDataSource.scala           | 26 ++++++--------------
 .../core/SelfCleaningDataSourceTest.scala       |  3 +--
 2 files changed, 8 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bab594a0/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala 
b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
index 51af64d..acbd4a9 100644
--- 
a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
+++ 
b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
@@ -127,7 +127,7 @@ trait SelfCleaningDataSource {
 
   def removePDuplicates(sc: SparkContext, rdd: RDD[Event]): RDD[Event] = {
     val now = DateTime.now()
-    rdd.map(x =>
+    rdd.sortBy(_.eventTime, true).map(x =>
       (recreateEvent(x, None, now), (x.eventId, x.eventTime)))
       .groupByKey
       .map{case (x, y) => recreateEvent(x, y.head._1, y.head._2)}
@@ -144,7 +144,7 @@ trait SelfCleaningDataSource {
 
   def removeLDuplicates(ls: Iterable[Event]): Iterable[Event] = {
     val now = DateTime.now()
-    ls.toList.map(x =>
+    ls.toList.reverse.map(x =>
       (recreateEvent(x, None, now), (x.eventId, x.eventTime)))
       .groupBy(_._1).mapValues( _.map( _._2 ) )
       .map(x => recreateEvent(x._1, x._2.head._1, x._2.head._2))
@@ -229,7 +229,7 @@ trait SelfCleaningDataSource {
     */
   @DeveloperApi
   def cleanPEvents(sc: SparkContext): RDD[Event] = {
-    val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime, false)
+    val pEvents = 
getCleanedPEvents(PEventStore.find(appName)(sc).sortBy(_.eventTime, false))
 
     val rdd = eventWindow match {
       case Some(ew) =>
@@ -241,7 +241,7 @@ trait SelfCleaningDataSource {
       case None =>
         pEvents
     }
-    getCleanedPEvents(rdd)
+    rdd
   }
 
   /** :: DeveloperApi ::
@@ -274,7 +274,7 @@ trait SelfCleaningDataSource {
     */
   @DeveloperApi
   def cleanLEvents(): Iterable[Event] = {
-    val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime).reverse
+    val lEvents = 
getCleanedLEvents(LEventStore.find(appName).toList.sortBy(_.eventTime).reverse)
 
     val events = eventWindow match {
       case Some(ew) =>
@@ -285,7 +285,7 @@ trait SelfCleaningDataSource {
       case None =>
         lEvents
     }
-    getCleanedLEvents(events)
+    events
   }
 
 
@@ -300,7 +300,7 @@ trait SelfCleaningDataSource {
         events.reduce { (e1, e2) =>
           val props = e2.event match {
             case "$set" =>
-              e1.properties.fields ++ 
e2.properties.fields.map(concatJArrays(_,e1))
+              e1.properties.fields ++ e2.properties.fields
             case "$unset" =>
               e1.properties.fields
                 .filterKeys(f => !e2.properties.fields.contains(f))
@@ -317,18 +317,6 @@ trait SelfCleaningDataSource {
         }
     }
   }
-
-  private def concatJArrays(propAndEvent: (String, JValue), e1: Event): 
(String,JValue) =
-    propAndEvent match {
-      case (k,v) => k -> (v match {
-        case jArr: JArray =>
-          (JArray((jArr.arr ++ (e1.properties.fields.getOrElse(k,JNothing) 
match {
-            case jArr2: JArray => jArr2.arr
-            case _ => List()
-          })).distinct))
-        case _ => v
-      })
-    }
 }
 
 case class EventWindow(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bab594a0/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
 
b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
index 484acc4..55af708 100644
--- 
a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
+++ 
b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
@@ -90,8 +90,7 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside 
with SharedSparkCo
                    JArray(
                      List(JString("Tablets"),
                           JString("Electronics"),
-                          JString("Google"),
-                          JString("Google2"))).values)
+                          JString("Google"))).values)
  
     distinctEventsAfterCount should equal (eventsAfterCount)
     eventsBeforeCount should be > (eventsAfterCount) 

Reply via email to