Repository: incubator-predictionio Updated Branches: refs/heads/develop 119262cbf -> be3611d9d
[PIO-45] Fix SelfCleaningDatasource / HBase data deletion bug Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/be3611d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/be3611d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/be3611d9 Branch: refs/heads/develop Commit: be3611d9d00625975f1f64de4cc6c1c829d8f542 Parents: 119262c Author: EmergentOrder <[email protected]> Authored: Thu Nov 24 10:32:45 2016 -0600 Committer: EmergentOrder <[email protected]> Committed: Thu Nov 24 10:32:45 2016 -0600 ---------------------------------------------------------------------- .../predictionio/core/SelfCleaningDataSource.scala | 2 +- .../predictionio/core/SelfCleaningDataSourceTest.scala | 1 + .../predictionio/data/storage/jdbc/JDBCPEvents.scala | 11 +++++++++-- 3 files changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/be3611d9/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala index f54efce..9a2e68a 100644 --- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala +++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala @@ -180,7 +180,7 @@ trait SelfCleaningDataSource { sc: SparkContext ): Unit = { val (appId, channelId) = Common.appNameToId(appName, None) - pEventsDb.write(newEvents.map(x => recreateEvent(x, None, x.eventTime)), appId)(sc) + pEventsDb.write(newEvents, appId)(sc) removePEvents(eventsToRemove, appId, sc) } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/be3611d9/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala index 2a6f7bd..e1ed832 100644 --- a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala +++ b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala @@ -80,6 +80,7 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside with SharedSparkCo distinctEventsAfterCount should equal (eventsAfterCount) eventsBeforeCount should be > (eventsAfterCount) itemEventsBeforeCount should be > (itemEventsAfterCount) + itemEventsAfterCount should be > 0l } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/be3611d9/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala index b3f7870..538bad4 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala @@ -138,8 +138,15 @@ class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String , "prId" , "creationTime" , "creationTimeZone") - - val eventDF = events.map { event => + + val eventDF = events.map(x => + Event(eventId = None, event = x.event, entityType = x.entityType, + entityId = x.entityId, targetEntityType = x.targetEntityType, + targetEntityId = x.targetEntityId, properties = x.properties, + eventTime = x.eventTime, tags = x.tags, prId= x.prId, + creationTime = x.eventTime) + ) + .map { event => (event.eventId.getOrElse(JDBCUtils.generateId) , event.event , event.entityType
