Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop 119262cbf -> be3611d9d


[PIO-45] Fix SelfCleaningDatasource / HBase data deletion bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/be3611d9
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/be3611d9
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/be3611d9

Branch: refs/heads/develop
Commit: be3611d9d00625975f1f64de4cc6c1c829d8f542
Parents: 119262c
Author: EmergentOrder <[email protected]>
Authored: Thu Nov 24 10:32:45 2016 -0600
Committer: EmergentOrder <[email protected]>
Committed: Thu Nov 24 10:32:45 2016 -0600

----------------------------------------------------------------------
 .../predictionio/core/SelfCleaningDataSource.scala       |  2 +-
 .../predictionio/core/SelfCleaningDataSourceTest.scala   |  1 +
 .../predictionio/data/storage/jdbc/JDBCPEvents.scala     | 11 +++++++++--
 3 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/be3611d9/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala 
b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
index f54efce..9a2e68a 100644
--- 
a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
+++ 
b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
@@ -180,7 +180,7 @@ trait SelfCleaningDataSource {
     sc: SparkContext
   ): Unit = {
     val (appId, channelId) = Common.appNameToId(appName, None)
-    pEventsDb.write(newEvents.map(x => recreateEvent(x, None, x.eventTime)), 
appId)(sc)
+    pEventsDb.write(newEvents, appId)(sc)
 
     removePEvents(eventsToRemove, appId, sc)
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/be3611d9/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
 
b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
index 2a6f7bd..e1ed832 100644
--- 
a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
+++ 
b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
@@ -80,6 +80,7 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside 
with SharedSparkCo
     distinctEventsAfterCount should equal (eventsAfterCount)
     eventsBeforeCount should be > (eventsAfterCount) 
     itemEventsBeforeCount should be > (itemEventsAfterCount)
+    itemEventsAfterCount should be > 0l
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/be3611d9/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
index b3f7870..538bad4 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -138,8 +138,15 @@ class JDBCPEvents(client: String, config: 
StorageClientConfig, namespace: String
       , "prId"
       , "creationTime"
       , "creationTimeZone")
-
-    val eventDF = events.map { event =>
+ 
+    val eventDF = events.map(x => 
+                              Event(eventId = None, event = x.event, 
entityType = x.entityType,
+                              entityId = x.entityId, targetEntityType = 
x.targetEntityType,
+                              targetEntityId = x.targetEntityId, properties = 
x.properties,
+                              eventTime = x.eventTime, tags = x.tags, prId= 
x.prId,
+                              creationTime = x.eventTime)
+                            )
+                        .map { event =>
       (event.eventId.getOrElse(JDBCUtils.generateId)
         , event.event
         , event.entityType

Reply via email to