Repository: incubator-predictionio Updated Branches: refs/heads/develop 1321f1ac7 -> 9b629800b
[PIO-45] Concat arrays in event properties on compress & don't time window filter out set/unset events Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/9b629800 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/9b629800 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/9b629800 Branch: refs/heads/develop Commit: 9b629800bb37bde47b9aed46c63b089b95ad39ca Parents: 1321f1a Author: EmergentOrder <[email protected]> Authored: Mon Feb 6 10:51:48 2017 -0600 Committer: EmergentOrder <[email protected]> Committed: Mon Feb 6 10:51:48 2017 -0600 ---------------------------------------------------------------------- .../core/SelfCleaningDataSource.scala | 20 ++++++++++++++++---- .../core/SelfCleaningDataSourceTest.scala | 12 ++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/9b629800/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala index dc80566..51af64d 100644 --- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala +++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala @@ -24,6 +24,7 @@ import org.apache.predictionio.data.store.{Common, LEventStore, PEventStore} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.joda.time.DateTime +import org.json4s._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} @@ -74,13 +75,12 @@ trait SelfCleaningDataSource { */ @DeveloperApi def getCleanedPEvents(pEvents: RDD[Event]): RDD[Event] = { - eventWindow .flatMap(_.duration) .map { duration => val fd = Duration(duration) pEvents.filter(e => - e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) + e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) || isSetEvent(e) ) }.getOrElse(pEvents) } @@ -99,7 +99,7 @@ trait SelfCleaningDataSource { .map { duration => val fd = Duration(duration) lEvents.filter(e => - e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) + e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) || isSetEvent(e) ) }.getOrElse(lEvents).toIterable } @@ -300,7 +300,7 @@ trait SelfCleaningDataSource { events.reduce { (e1, e2) => val props = e2.event match { case "$set" => - e1.properties.fields ++ e2.properties.fields + e1.properties.fields ++ e2.properties.fields.map(concatJArrays(_,e1)) case "$unset" => e1.properties.fields .filterKeys(f => !e2.properties.fields.contains(f)) @@ -317,6 +317,18 @@ trait SelfCleaningDataSource { } } } + + private def concatJArrays(propAndEvent: (String, JValue), e1: Event): (String,JValue) = + propAndEvent match { + case (k,v) => k -> (v match { + case jArr: JArray => + (JArray((jArr.arr ++ (e1.properties.fields.getOrElse(k,JNothing) match { + case jArr2: JArray => jArr2.arr + case _ => List() + })).distinct)) + case _ => v + }) + } } case class EventWindow( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/9b629800/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala index 1aed1ba..484acc4 100644 --- a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala +++ b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala @@ -33,6 +33,9 @@ import org.apache.predictionio.data.store._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ +import org.json4s._ +import org.json4s.DefaultFormats + import org.apache.spark.rdd.RDD import org.scalatest.Inspectors._ import org.scalatest.Matchers._ @@ -79,7 +82,16 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside with SharedSparkCo val nexusSet = eventsAfter.filter(x => x.event == "$set" && x.entityId == "Nexus").take(1)(0) + implicit val formats = DefaultFormats + nexusSet.properties.get[String]("available") should equal ("2016-03-18T13:31:49.016770+00:00") + + nexusSet.properties.get[JArray]("categories").values should equal ( + JArray( + List(JString("Tablets"), + JString("Electronics"), + JString("Google"), + JString("Google2"))).values) distinctEventsAfterCount should equal (eventsAfterCount) eventsBeforeCount should be > (eventsAfterCount)
