Repository: incubator-predictionio Updated Branches: refs/heads/develop 8798547af -> 1a222657b
[PIO-45] Fix compression order in SelfCleaningDataSource. Thanks @jimlyndon Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/1a222657 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/1a222657 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/1a222657 Branch: refs/heads/develop Commit: 1a222657b8438b56b2ef04eb14af5a3cf441bad0 Parents: 8798547 Author: EmergentOrder <[email protected]> Authored: Fri Jan 27 10:56:23 2017 -0600 Committer: EmergentOrder <[email protected]> Committed: Fri Jan 27 10:56:23 2017 -0600 ---------------------------------------------------------------------- .../apache/predictionio/core/SelfCleaningDataSource.scala | 9 +++++---- .../predictionio/core/SelfCleaningDataSourceTest.scala | 6 +++++- core/src/test/scala/org/apache/predictionio/core/test.json | 1 + 3 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/1a222657/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala index 9a2e68a..dc80566 100644 --- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala +++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala @@ -229,7 +229,7 @@ trait SelfCleaningDataSource { */ @DeveloperApi def cleanPEvents(sc: SparkContext): RDD[Event] = { - val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime) + val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime, false) val rdd = eventWindow match { case Some(ew) => @@ -274,7 +274,7 @@ trait SelfCleaningDataSource { */ @DeveloperApi def cleanLEvents(): Iterable[Event] = { - val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime) + val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime).reverse val events = eventWindow match { case Some(ew) => @@ -305,13 +305,14 @@ trait SelfCleaningDataSource { e1.properties.fields .filterKeys(f => !e2.properties.fields.contains(f)) } - e1.copy(properties = DataMap(props)) + e1.copy(properties = DataMap(props), eventTime = e2.eventTime) } case None => events.reduce { (e1, e2) => e1.copy(properties = - DataMap(e1.properties.fields ++ e2.properties.fields) + DataMap(e1.properties.fields ++ e2.properties.fields), + eventTime = e2.eventTime ) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/1a222657/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala index e1ed832..1aed1ba 100644 --- a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala +++ b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala @@ -76,7 +76,11 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside with SharedSparkCo val itemEventsAfterCount = source.itemEvents(sc).count val distinctEventsAfterCount = eventsAfter.map(x => CleanedDataSourceTest.stripIdAndCreationTimeFromEvents(x)).distinct.count - + + val nexusSet = eventsAfter.filter(x => x.event == "$set" && x.entityId == "Nexus").take(1)(0) + + nexusSet.properties.get[String]("available") should equal ("2016-03-18T13:31:49.016770+00:00") + distinctEventsAfterCount should equal (eventsAfterCount) eventsBeforeCount should be > (eventsAfterCount) itemEventsBeforeCount should be > (itemEventsAfterCount) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/1a222657/core/src/test/scala/org/apache/predictionio/core/test.json ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/core/test.json b/core/src/test/scala/org/apache/predictionio/core/test.json index f38734f..7732d43 100644 --- a/core/src/test/scala/org/apache/predictionio/core/test.json +++ b/core/src/test/scala/org/apache/predictionio/core/test.json @@ -3,6 +3,7 @@ {"event":"$set","entityType":"item","entityId":"Nexus","properties":{"categories":["Tablets","Electronics","Google2"], "test": ["testA", "testB"]},"eventTime":"2006-03-17T15:54:49.941Z","creationTime":"2006-03-17T15:54:49.945Z"} {"eventId":"KpjNMVrQzY2s0TZhYB3vsAAAAVOFSkNogMMiTarDxQA","event":"$set","entityType":"item","entityId":"Nexus","properties":{"countries":["United States","Canada"]},"eventTime":"2016-03-17T15:55:49.992Z","creationTime":"2016-03-17T15:55:49.997Z"} {"eventId":"KpjNMVrQzY2s0TZhYB3vsAAAAVOFSkOdrr3SJaHTlQQ","event":"$set","entityType":"item","entityId":"Nexus","properties":{"available":"2016-03-14T13:31:49.016770+00:00","date":"2016-03-16T13:31:49.016770+00:00","expires":"2016-03-18T13:31:49.016770+00:00"},"eventTime":"2016-03-17T15:55:50.045Z","creationTime":"2016-03-17T15:55:50.049Z"} +{"eventId":"KpjNMVrQzY2s0TZhYB3vsAAAAVOFSkOdrr3SJaHTlQQ","event":"$set","entityType":"item","entityId":"Nexus","properties":{"available":"2016-03-18T13:31:49.016770+00:00","date":"2016-03-16T13:31:49.016770+00:00","expires":"2016-03-18T13:31:49.016770+00:00"},"eventTime":"2016-03-18T15:55:50.045Z","creationTime":"2016-03-18T15:55:50.049Z"} {"eventId":"MdgNfySNSsz0WVh1q6f3_gAAAVOFSkNKjmJz4kil3F0","event":"$set","entityType":"item","entityId":"Surface","properties":{"categories":["Tablets","Electronics","Microsoft"]},"eventTime":"2016-03-17T15:55:49.962Z","creationTime":"2016-03-17T15:55:49.966Z"} {"eventId":"MdgNfySNSsz0WVh1q6f3_gAAAVOFSkN-lNLH6dbWhjI","event":"$set","entityType":"item","entityId":"Surface","properties":{"countries":["United States","Canada"]},"eventTime":"2016-03-17T15:55:50.014Z","creationTime":"2016-03-17T15:55:50.018Z"} {"eventId":"MdgNfySNSsz0WVh1q6f3_gAAAVOFSkOmhp8HSvY0l2M","event":"$set","entityType":"item","entityId":"Surface","properties":{"available":"2016-03-15T08:43:49.016770+00:00","date":"2016-03-17T08:43:49.016770+00:00","expires":"2016-03-19T08:43:49.016770+00:00"},"eventTime":"2016-03-17T15:55:50.054Z","creationTime":"2016-03-17T15:55:50.060Z"}
