Yarish George created PIO-143:
---------------------------------

             Summary: $set - $unset events compression doesn't work
                 Key: PIO-143
                 URL: https://issues.apache.org/jira/browse/PIO-143
             Project: PredictionIO
          Issue Type: Bug
            Reporter: Yarish George


In my hbase I have 2 events :

{code:json}
{"event" : "$set", "entityType" : "user", "entityId" : "100", "properties" : 
{"test" : "0.8"}, "eventTime": "2017-12-19T21:02:49.228Z"}
{"event" : "$unset", "entityType" : "user", "entityId" : "100", "properties" : 
{"test" : null}, "eventTime": "2017-12-19T21:02:55.228Z"}
{code}

which I'm trying to compress, actual result is:

{code:java}
17/12/22 16:56:44 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$1
Serialization stack:
        - object not serializable (class: 
scala.collection.immutable.MapLike$$anon$1, value: Map())
        - field (class: org.apache.predictionio.data.storage.DataMap, name: 
fields, type: interface scala.collection.immutable.Map)
        - object (class org.apache.predictionio.data.storage.DataMap, 
DataMap(Map()))
        - field (class: org.apache.predictionio.data.storage.Event, name: 
properties, type: class org.apache.predictionio.data.storage.DataMap)
        - object (class org.apache.predictionio.data.storage.Event, 
Event(id=Some(oE62rWfkR2R4rjazOnUUagAAAWBwlgmMtO4Dcl8MjO0),event=$set,eType=user,eId=49006307,tType=None,tId=None,p=DataMap(Map()),t=2017-12-19T21:02:55.228Z,tags=List(),pKey=None,ct=2017-12-23T00:18:56.858Z))
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at 
org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:145)
        at 
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:184)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

My setup is:
pio platform = 0.11.0-incubating
ur recommender = 0.6.0
scala = 2.10.6
elasticsearch = 1.7.5

My guess is that SelfCleaningDataSource produces not serializable map at 306

{code:java}
e1.properties.fields
                .filterKeys(f => !e2.properties.fields.contains(f))
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to