Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop 1321f1ac7 -> 9b629800b


[PIO-45] Concat arrays in event properties on compress & don't time window 
filter out set/unset events


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/9b629800
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/9b629800
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/9b629800

Branch: refs/heads/develop
Commit: 9b629800bb37bde47b9aed46c63b089b95ad39ca
Parents: 1321f1a
Author: EmergentOrder <[email protected]>
Authored: Mon Feb 6 10:51:48 2017 -0600
Committer: EmergentOrder <[email protected]>
Committed: Mon Feb 6 10:51:48 2017 -0600

----------------------------------------------------------------------
 .../core/SelfCleaningDataSource.scala           | 20 ++++++++++++++++----
 .../core/SelfCleaningDataSourceTest.scala       | 12 ++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/9b629800/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala 
b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
index dc80566..51af64d 100644
--- 
a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
+++ 
b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
@@ -24,6 +24,7 @@ import org.apache.predictionio.data.store.{Common, 
LEventStore, PEventStore}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.joda.time.DateTime
+import org.json4s._
 
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.{Await, Future}
@@ -74,13 +75,12 @@ trait SelfCleaningDataSource {
     */
   @DeveloperApi
   def getCleanedPEvents(pEvents: RDD[Event]): RDD[Event] = {
-
     eventWindow
       .flatMap(_.duration)
       .map { duration =>
         val fd = Duration(duration)
         pEvents.filter(e =>
-          e.eventTime.isAfter(DateTime.now().minus(fd.toMillis))
+          e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) || 
isSetEvent(e)
         )
       }.getOrElse(pEvents)
   }
@@ -99,7 +99,7 @@ trait SelfCleaningDataSource {
       .map { duration =>
         val fd = Duration(duration)
         lEvents.filter(e =>
-          e.eventTime.isAfter(DateTime.now().minus(fd.toMillis))
+          e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) || 
isSetEvent(e)
         )
       }.getOrElse(lEvents).toIterable
   }
@@ -300,7 +300,7 @@ trait SelfCleaningDataSource {
         events.reduce { (e1, e2) =>
           val props = e2.event match {
             case "$set" =>
-              e1.properties.fields ++ e2.properties.fields
+              e1.properties.fields ++ 
e2.properties.fields.map(concatJArrays(_,e1))
             case "$unset" =>
               e1.properties.fields
                 .filterKeys(f => !e2.properties.fields.contains(f))
@@ -317,6 +317,18 @@ trait SelfCleaningDataSource {
         }
     }
   }
+
+  private def concatJArrays(propAndEvent: (String, JValue), e1: Event): 
(String,JValue) =
+    propAndEvent match {
+      case (k,v) => k -> (v match {
+        case jArr: JArray =>
+          (JArray((jArr.arr ++ (e1.properties.fields.getOrElse(k,JNothing) 
match {
+            case jArr2: JArray => jArr2.arr
+            case _ => List()
+          })).distinct))
+        case _ => v
+      })
+    }
 }
 
 case class EventWindow(

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/9b629800/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
 
b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
index 1aed1ba..484acc4 100644
--- 
a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
+++ 
b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
@@ -33,6 +33,9 @@ import org.apache.predictionio.data.store._
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 
+import org.json4s._
+import org.json4s.DefaultFormats
+
 import org.apache.spark.rdd.RDD
 import org.scalatest.Inspectors._
 import org.scalatest.Matchers._
@@ -79,7 +82,16 @@ class SelfCleaningDataSourceTest extends FunSuite with 
Inside with SharedSparkCo
 
     val nexusSet = eventsAfter.filter(x => x.event == "$set" && x.entityId == 
"Nexus").take(1)(0) 
 
+    implicit val formats = DefaultFormats
+
     nexusSet.properties.get[String]("available") should equal 
("2016-03-18T13:31:49.016770+00:00")
+
+    nexusSet.properties.get[JArray]("categories").values should equal (
+                   JArray(
+                     List(JString("Tablets"),
+                          JString("Electronics"),
+                          JString("Google"),
+                          JString("Google2"))).values)
  
     distinctEventsAfterCount should equal (eventsAfterCount)
     eventsBeforeCount should be > (eventsAfterCount) 

Reply via email to